1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.concurrent.FastThreadLocal;
19 import io.netty.util.concurrent.FastThreadLocalThread;
20 import io.netty.util.internal.ObjectPool;
21 import io.netty.util.internal.PlatformDependent;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26 import org.jctools.queues.MessagePassingQueue;
27 import org.jetbrains.annotations.VisibleForTesting;
28
29 import java.util.ArrayDeque;
30 import java.util.Queue;
31 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32
33 import static io.netty.util.internal.PlatformDependent.newMpscQueue;
34 import static java.lang.Math.max;
35 import static java.lang.Math.min;
36
37
38
39
40
41
42 public abstract class Recycler<T> {
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
44 private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
45 @Override
46 public void recycle(Object object) {
47
48 }
49
50 @Override
51 public void unguardedRecycle(final Object object) {
52
53 }
54
55 @Override
56 public String toString() {
57 return "NOOP_HANDLE";
58 }
59 };
60 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
61 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
62 private static final int RATIO;
63 private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
64 private static final boolean BLOCKING_POOL;
65 private static final boolean BATCH_FAST_TL_ONLY;
66
67 static {
68
69
70
71 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
72 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
73 if (maxCapacityPerThread < 0) {
74 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
75 }
76
77 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
78 DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
79
80
81
82
83 RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
84
85 BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
86 BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
87
88 if (logger.isDebugEnabled()) {
89 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
90 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
91 logger.debug("-Dio.netty.recycler.ratio: disabled");
92 logger.debug("-Dio.netty.recycler.chunkSize: disabled");
93 logger.debug("-Dio.netty.recycler.blocking: disabled");
94 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
95 } else {
96 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
98 logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
99 logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
100 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
101 }
102 }
103 }
104
105 private final int maxCapacityPerThread;
106 private final int interval;
107 private final int chunkSize;
108 private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
109 @Override
110 protected LocalPool<T> initialValue() {
111 return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
112 }
113
114 @Override
115 protected void onRemoval(LocalPool<T> value) throws Exception {
116 super.onRemoval(value);
117 MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
118 value.pooledHandles = null;
119 value.owner = null;
120 handles.clear();
121 }
122 };
123
124 protected Recycler() {
125 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
126 }
127
128 protected Recycler(int maxCapacityPerThread) {
129 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
130 }
131
132
133
134
135
136 @Deprecated
137 @SuppressWarnings("unused")
138 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
140 }
141
142
143
144
145
146 @Deprecated
147 @SuppressWarnings("unused")
148 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
149 int ratio, int maxDelayedQueuesPerThread) {
150 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
151 }
152
153
154
155
156
157 @Deprecated
158 @SuppressWarnings("unused")
159 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
160 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
161 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
162 }
163
164 protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
165 interval = max(0, ratio);
166 if (maxCapacityPerThread <= 0) {
167 this.maxCapacityPerThread = 0;
168 this.chunkSize = 0;
169 } else {
170 this.maxCapacityPerThread = max(4, maxCapacityPerThread);
171 this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
172 }
173 }
174
175 @SuppressWarnings("unchecked")
176 public final T get() {
177 if (maxCapacityPerThread == 0 || PlatformDependent.isVirtualThread(Thread.currentThread())) {
178 return newObject((Handle<T>) NOOP_HANDLE);
179 }
180 LocalPool<T> localPool = threadLocal.get();
181 DefaultHandle<T> handle = localPool.claim();
182 T obj;
183 if (handle == null) {
184 handle = localPool.newHandle();
185 if (handle != null) {
186 obj = newObject(handle);
187 handle.set(obj);
188 } else {
189 obj = newObject((Handle<T>) NOOP_HANDLE);
190 }
191 } else {
192 obj = handle.get();
193 }
194
195 return obj;
196 }
197
198
199
200
201 @Deprecated
202 public final boolean recycle(T o, Handle<T> handle) {
203 if (handle == NOOP_HANDLE) {
204 return false;
205 }
206
207 handle.recycle(o);
208 return true;
209 }
210
211 @VisibleForTesting
212 final int threadLocalSize() {
213 if (PlatformDependent.isVirtualThread(Thread.currentThread())) {
214 return 0;
215 }
216 LocalPool<T> localPool = threadLocal.getIfExists();
217 return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
218 }
219
220
221
222
223 protected abstract T newObject(Handle<T> handle);
224
225 @SuppressWarnings("ClassNameSameAsAncestorName")
226 public interface Handle<T> extends ObjectPool.Handle<T> { }
227
228 @UnstableApi
229 public abstract static class EnhancedHandle<T> implements Handle<T> {
230
231 public abstract void unguardedRecycle(Object object);
232
233 private EnhancedHandle() {
234 }
235 }
236
237 private static final class DefaultHandle<T> extends EnhancedHandle<T> {
238 private static final int STATE_CLAIMED = 0;
239 private static final int STATE_AVAILABLE = 1;
240 private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
241 static {
242 AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
243
244 STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
245 }
246
247 private volatile int state;
248 private final LocalPool<T> localPool;
249 private T value;
250
251 DefaultHandle(LocalPool<T> localPool) {
252 this.localPool = localPool;
253 }
254
255 @Override
256 public void recycle(Object object) {
257 if (object != value) {
258 throw new IllegalArgumentException("object does not belong to handle");
259 }
260 localPool.release(this, true);
261 }
262
263 @Override
264 public void unguardedRecycle(Object object) {
265 if (object != value) {
266 throw new IllegalArgumentException("object does not belong to handle");
267 }
268 localPool.release(this, false);
269 }
270
271 T get() {
272 return value;
273 }
274
275 void set(T value) {
276 this.value = value;
277 }
278
279 void toClaimed() {
280 assert state == STATE_AVAILABLE;
281 STATE_UPDATER.lazySet(this, STATE_CLAIMED);
282 }
283
284 void toAvailable() {
285 int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
286 if (prev == STATE_AVAILABLE) {
287 throw new IllegalStateException("Object has been recycled already.");
288 }
289 }
290
291 void unguardedToAvailable() {
292 int prev = state;
293 if (prev == STATE_AVAILABLE) {
294 throw new IllegalStateException("Object has been recycled already.");
295 }
296 STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
297 }
298 }
299
300 private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
301 private final int ratioInterval;
302 private final int chunkSize;
303 private final ArrayDeque<DefaultHandle<T>> batch;
304 private volatile Thread owner;
305 private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
306 private int ratioCounter;
307
308 @SuppressWarnings("unchecked")
309 LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
310 this.ratioInterval = ratioInterval;
311 this.chunkSize = chunkSize;
312 batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
313 Thread currentThread = Thread.currentThread();
314 owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null;
315 if (BLOCKING_POOL) {
316 pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
317 } else {
318 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
319 }
320 ratioCounter = ratioInterval;
321 }
322
323 DefaultHandle<T> claim() {
324 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
325 if (handles == null) {
326 return null;
327 }
328 if (batch.isEmpty()) {
329 handles.drain(this, chunkSize);
330 }
331 DefaultHandle<T> handle = batch.pollLast();
332 if (null != handle) {
333 handle.toClaimed();
334 }
335 return handle;
336 }
337
338 void release(DefaultHandle<T> handle, boolean guarded) {
339 if (guarded) {
340 handle.toAvailable();
341 } else {
342 handle.unguardedToAvailable();
343 }
344 Thread owner = this.owner;
345 if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
346 accept(handle);
347 } else if (owner != null && isTerminated(owner)) {
348 this.owner = null;
349 pooledHandles = null;
350 } else {
351 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
352 if (handles != null) {
353 handles.relaxedOffer(handle);
354 }
355 }
356 }
357
358 private static boolean isTerminated(Thread owner) {
359
360
361 return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
362 }
363
364 DefaultHandle<T> newHandle() {
365 if (++ratioCounter >= ratioInterval) {
366 ratioCounter = 0;
367 return new DefaultHandle<T>(this);
368 }
369 return null;
370 }
371
372 @Override
373 public void accept(DefaultHandle<T> e) {
374 batch.addLast(e);
375 }
376 }
377
378
379
380
381
382
383
384 private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
385 private final Queue<T> deque;
386 private final int maxCapacity;
387
388 BlockingMessageQueue(int maxCapacity) {
389 this.maxCapacity = maxCapacity;
390
391
392
393
394
395
396
397
398
399 deque = new ArrayDeque<T>();
400 }
401
402 @Override
403 public synchronized boolean offer(T e) {
404 if (deque.size() == maxCapacity) {
405 return false;
406 }
407 return deque.offer(e);
408 }
409
410 @Override
411 public synchronized T poll() {
412 return deque.poll();
413 }
414
415 @Override
416 public synchronized T peek() {
417 return deque.peek();
418 }
419
420 @Override
421 public synchronized int size() {
422 return deque.size();
423 }
424
425 @Override
426 public synchronized void clear() {
427 deque.clear();
428 }
429
430 @Override
431 public synchronized boolean isEmpty() {
432 return deque.isEmpty();
433 }
434
435 @Override
436 public int capacity() {
437 return maxCapacity;
438 }
439
440 @Override
441 public boolean relaxedOffer(T e) {
442 return offer(e);
443 }
444
445 @Override
446 public T relaxedPoll() {
447 return poll();
448 }
449
450 @Override
451 public T relaxedPeek() {
452 return peek();
453 }
454
455 @Override
456 public int drain(Consumer<T> c, int limit) {
457 T obj;
458 int i = 0;
459 for (; i < limit && (obj = poll()) != null; i++) {
460 c.accept(obj);
461 }
462 return i;
463 }
464
465 @Override
466 public int fill(Supplier<T> s, int limit) {
467 throw new UnsupportedOperationException();
468 }
469
470 @Override
471 public int drain(Consumer<T> c) {
472 throw new UnsupportedOperationException();
473 }
474
475 @Override
476 public int fill(Supplier<T> s) {
477 throw new UnsupportedOperationException();
478 }
479
480 @Override
481 public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
482 throw new UnsupportedOperationException();
483 }
484
485 @Override
486 public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
487 throw new UnsupportedOperationException();
488 }
489 }
490 }