1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util;
17
18 import io.netty5.util.concurrent.FastThreadLocal;
19 import io.netty5.util.internal.ObjectPool;
20 import io.netty5.util.internal.PlatformDependent;
21 import io.netty5.util.internal.SystemPropertyUtil;
22 import io.netty5.util.internal.logging.InternalLogger;
23 import io.netty5.util.internal.logging.InternalLoggerFactory;
24 import org.jctools.queues.MessagePassingQueue;
25
26 import java.util.ArrayDeque;
27 import java.util.Queue;
28 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
29
30 import static io.netty5.util.internal.PlatformDependent.newMpscQueue;
31 import static java.lang.Math.max;
32 import static java.lang.Math.min;
33
34
35
36
37
38
39 public abstract class Recycler<T> {
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
41 private static final Handle<?> NOOP_HANDLE = new Handle<>() {
42 @Override
43 public void recycle(Object object) {
44
45 }
46
47 @Override
48 public String toString() {
49 return "NOOP_HANDLE";
50 }
51 };
52 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
53 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
54 private static final int RATIO;
55 private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
56 private static final boolean BLOCKING_POOL;
57
58 static {
59
60
61
62 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty5.recycler.maxCapacityPerThread",
63 SystemPropertyUtil.getInt("io.netty5.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
64 if (maxCapacityPerThread < 0) {
65 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
66 }
67
68 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
69 DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty5.recycler.chunkSize", 32);
70
71
72
73
74 RATIO = max(0, SystemPropertyUtil.getInt("io.netty5.recycler.ratio", 8));
75
76 BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty5.recycler.blocking", false);
77
78 if (logger.isDebugEnabled()) {
79 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
80 logger.debug("-Dio.netty5.recycler.maxCapacityPerThread: disabled");
81 logger.debug("-Dio.netty5.recycler.ratio: disabled");
82 logger.debug("-Dio.netty5.recycler.chunkSize: disabled");
83 logger.debug("-Dio.netty5.recycler.blocking: disabled");
84 } else {
85 logger.debug("-Dio.netty5.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
86 logger.debug("-Dio.netty5.recycler.ratio: {}", RATIO);
87 logger.debug("-Dio.netty5.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
88 logger.debug("-Dio.netty5.recycler.blocking: {}", BLOCKING_POOL);
89 }
90 }
91 }
92
93 private final int maxCapacityPerThread;
94 private final int interval;
95 private final int chunkSize;
96 private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<>() {
97 @Override
98 protected LocalPool<T> initialValue() {
99 return new LocalPool<>(maxCapacityPerThread, interval, chunkSize);
100 }
101
102 @Override
103 protected void onRemoval(LocalPool<T> value) throws Exception {
104 super.onRemoval(value);
105 MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
106 value.pooledHandles = null;
107 handles.clear();
108 }
109 };
110
111 protected Recycler() {
112 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
113 }
114
115 protected Recycler(int maxCapacityPerThread) {
116 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
117 }
118
119
120
121
122
123 @Deprecated
124 @SuppressWarnings("unused")
125 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
126 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
127 }
128
129
130
131
132
133 @Deprecated
134 @SuppressWarnings("unused")
135 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
136 int ratio, int maxDelayedQueuesPerThread) {
137 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
138 }
139
140
141
142
143
144 @Deprecated
145 @SuppressWarnings("unused")
146 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
147 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
148 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
149 }
150
151 protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
152 interval = max(0, ratio);
153 if (maxCapacityPerThread <= 0) {
154 this.maxCapacityPerThread = 0;
155 this.chunkSize = 0;
156 } else {
157 this.maxCapacityPerThread = max(4, maxCapacityPerThread);
158 this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
159 }
160 }
161
162 @SuppressWarnings("unchecked")
163 public final T get() {
164 if (maxCapacityPerThread == 0) {
165 return newObject((Handle<T>) NOOP_HANDLE);
166 }
167 LocalPool<T> localPool = threadLocal.get();
168 DefaultHandle<T> handle = localPool.claim();
169 T obj;
170 if (handle == null) {
171 handle = localPool.newHandle();
172 if (handle != null) {
173 obj = newObject(handle);
174 handle.set(obj);
175 } else {
176 obj = newObject((Handle<T>) NOOP_HANDLE);
177 }
178 } else {
179 obj = handle.get();
180 }
181
182 return obj;
183 }
184
185
186
187
188 @Deprecated
189 public final boolean recycle(T o, Handle<T> handle) {
190 if (handle == NOOP_HANDLE) {
191 return false;
192 }
193
194 handle.recycle(o);
195 return true;
196 }
197
198 final int threadLocalSize() {
199 return threadLocal.get().pooledHandles.size();
200 }
201
202 protected abstract T newObject(Handle<T> handle);
203
204 @SuppressWarnings("ClassNameSameAsAncestorName")
205 public interface Handle<T> extends ObjectPool.Handle<T> { }
206
207 private static final class DefaultHandle<T> implements Handle<T> {
208 private static final int STATE_CLAIMED = 0;
209 private static final int STATE_AVAILABLE = 1;
210 private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
211 static {
212 AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
213
214 STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
215 }
216
217 @SuppressWarnings({"FieldMayBeFinal", "unused"})
218 private volatile int state;
219 private final LocalPool<T> localPool;
220 private T value;
221
222 DefaultHandle(LocalPool<T> localPool) {
223 this.localPool = localPool;
224 }
225
226 @Override
227 public void recycle(Object object) {
228 if (object != value) {
229 throw new IllegalArgumentException("object does not belong to handle");
230 }
231 localPool.release(this);
232 }
233
234 T get() {
235 return value;
236 }
237
238 void set(T value) {
239 this.value = value;
240 }
241
242 boolean availableToClaim() {
243 if (state != STATE_AVAILABLE) {
244 return false;
245 }
246 return STATE_UPDATER.compareAndSet(this, STATE_AVAILABLE, STATE_CLAIMED);
247 }
248
249 void toAvailable() {
250 int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
251 if (prev == STATE_AVAILABLE) {
252 throw new IllegalStateException("Object has been recycled already.");
253 }
254 }
255 }
256
257 private static final class LocalPool<T> {
258 private final int ratioInterval;
259 private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
260 private int ratioCounter;
261
262 @SuppressWarnings("unchecked")
263 LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
264 this.ratioInterval = ratioInterval;
265 if (BLOCKING_POOL) {
266 pooledHandles = new BlockingMessageQueue<>(maxCapacity);
267 } else {
268 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
269 }
270 ratioCounter = ratioInterval;
271 }
272
273 DefaultHandle<T> claim() {
274 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
275 if (handles == null) {
276 return null;
277 }
278 DefaultHandle<T> handle;
279 do {
280 handle = handles.relaxedPoll();
281 } while (handle != null && !handle.availableToClaim());
282 return handle;
283 }
284
285 void release(DefaultHandle<T> handle) {
286 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
287 handle.toAvailable();
288 if (handles != null) {
289 handles.relaxedOffer(handle);
290 }
291 }
292
293 DefaultHandle<T> newHandle() {
294 if (++ratioCounter >= ratioInterval) {
295 ratioCounter = 0;
296 return new DefaultHandle<>(this);
297 }
298 return null;
299 }
300 }
301
302
303
304
305
306
307
308 private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
309 private final Queue<T> deque;
310 private final int maxCapacity;
311
312 BlockingMessageQueue(int maxCapacity) {
313 this.maxCapacity = maxCapacity;
314
315
316
317
318
319
320
321
322
323 deque = new ArrayDeque<>();
324 }
325
326 @Override
327 public synchronized boolean offer(T e) {
328 if (deque.size() == maxCapacity) {
329 return false;
330 }
331 return deque.offer(e);
332 }
333
334 @Override
335 public synchronized T poll() {
336 return deque.poll();
337 }
338
339 @Override
340 public synchronized T peek() {
341 return deque.peek();
342 }
343
344 @Override
345 public synchronized int size() {
346 return deque.size();
347 }
348
349 @Override
350 public synchronized void clear() {
351 deque.clear();
352 }
353
354 @Override
355 public synchronized boolean isEmpty() {
356 return deque.isEmpty();
357 }
358
359 @Override
360 public int capacity() {
361 return maxCapacity;
362 }
363
364 @Override
365 public boolean relaxedOffer(T e) {
366 return offer(e);
367 }
368
369 @Override
370 public T relaxedPoll() {
371 return poll();
372 }
373
374 @Override
375 public T relaxedPeek() {
376 return peek();
377 }
378
379 @Override
380 public int drain(Consumer<T> c, int limit) {
381 throw new UnsupportedOperationException();
382 }
383
384 @Override
385 public int fill(Supplier<T> s, int limit) {
386 throw new UnsupportedOperationException();
387 }
388
389 @Override
390 public int drain(Consumer<T> c) {
391 throw new UnsupportedOperationException();
392 }
393
394 @Override
395 public int fill(Supplier<T> s) {
396 throw new UnsupportedOperationException();
397 }
398
399 @Override
400 public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
401 throw new UnsupportedOperationException();
402 }
403
404 @Override
405 public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
406 throw new UnsupportedOperationException();
407 }
408 }
409 }