1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.concurrent.FastThreadLocal;
19 import io.netty.util.concurrent.FastThreadLocalThread;
20 import io.netty.util.internal.ObjectPool;
21 import io.netty.util.internal.PlatformDependent;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26 import org.jctools.queues.MessagePassingQueue;
27 import org.jetbrains.annotations.VisibleForTesting;
28
29 import java.util.ArrayDeque;
30 import java.util.Queue;
31 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32
33 import static io.netty.util.internal.PlatformDependent.newMpscQueue;
34 import static java.lang.Math.max;
35 import static java.lang.Math.min;
36
37
38
39
40
41
42 public abstract class Recycler<T> {
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
44 private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
45 @Override
46 public void recycle(Object object) {
47
48 }
49
50 @Override
51 public void unguardedRecycle(final Object object) {
52
53 }
54
55 @Override
56 public String toString() {
57 return "NOOP_HANDLE";
58 }
59 };
60 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
61 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
62 private static final int RATIO;
63 private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
64 private static final boolean BLOCKING_POOL;
65 private static final boolean BATCH_FAST_TL_ONLY;
66
67 static {
68
69
70
71 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
72 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
73 if (maxCapacityPerThread < 0) {
74 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
75 }
76
77 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
78 DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
79
80
81
82
83 RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
84
85 BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
86 BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
87
88 if (logger.isDebugEnabled()) {
89 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
90 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
91 logger.debug("-Dio.netty.recycler.ratio: disabled");
92 logger.debug("-Dio.netty.recycler.chunkSize: disabled");
93 logger.debug("-Dio.netty.recycler.blocking: disabled");
94 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
95 } else {
96 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
98 logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
99 logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
100 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
101 }
102 }
103 }
104
105 private final int maxCapacityPerThread;
106 private final int interval;
107 private final int chunkSize;
108 private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
109 @Override
110 protected LocalPool<T> initialValue() {
111 return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
112 }
113
114 @Override
115 protected void onRemoval(LocalPool<T> value) throws Exception {
116 super.onRemoval(value);
117 MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
118 value.pooledHandles = null;
119 value.owner = null;
120 handles.clear();
121 }
122 };
123
124 protected Recycler() {
125 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
126 }
127
128 protected Recycler(int maxCapacityPerThread) {
129 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
130 }
131
132
133
134
135
136 @Deprecated
137 @SuppressWarnings("unused")
138 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
140 }
141
142
143
144
145
146 @Deprecated
147 @SuppressWarnings("unused")
148 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
149 int ratio, int maxDelayedQueuesPerThread) {
150 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
151 }
152
153
154
155
156
157 @Deprecated
158 @SuppressWarnings("unused")
159 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
160 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
161 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
162 }
163
164 protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
165 interval = max(0, ratio);
166 if (maxCapacityPerThread <= 0) {
167 this.maxCapacityPerThread = 0;
168 this.chunkSize = 0;
169 } else {
170 this.maxCapacityPerThread = max(4, maxCapacityPerThread);
171 this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
172 }
173 }
174
175 @SuppressWarnings("unchecked")
176 public final T get() {
177 if (maxCapacityPerThread == 0) {
178 return newObject((Handle<T>) NOOP_HANDLE);
179 }
180 LocalPool<T> localPool = threadLocal.get();
181 DefaultHandle<T> handle = localPool.claim();
182 T obj;
183 if (handle == null) {
184 handle = localPool.newHandle();
185 if (handle != null) {
186 obj = newObject(handle);
187 handle.set(obj);
188 } else {
189 obj = newObject((Handle<T>) NOOP_HANDLE);
190 }
191 } else {
192 obj = handle.get();
193 }
194
195 return obj;
196 }
197
198
199
200
201 @Deprecated
202 public final boolean recycle(T o, Handle<T> handle) {
203 if (handle == NOOP_HANDLE) {
204 return false;
205 }
206
207 handle.recycle(o);
208 return true;
209 }
210
211 @VisibleForTesting
212 final int threadLocalSize() {
213 LocalPool<T> localPool = threadLocal.getIfExists();
214 return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
215 }
216
217
218
219
220 protected abstract T newObject(Handle<T> handle);
221
222 @SuppressWarnings("ClassNameSameAsAncestorName")
223 public interface Handle<T> extends ObjectPool.Handle<T> { }
224
225 @UnstableApi
226 public abstract static class EnhancedHandle<T> implements Handle<T> {
227
228 public abstract void unguardedRecycle(Object object);
229
230 private EnhancedHandle() {
231 }
232 }
233
234 private static final class DefaultHandle<T> extends EnhancedHandle<T> {
235 private static final int STATE_CLAIMED = 0;
236 private static final int STATE_AVAILABLE = 1;
237 private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
238 static {
239 AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
240
241 STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
242 }
243
244 private volatile int state;
245 private final LocalPool<T> localPool;
246 private T value;
247
248 DefaultHandle(LocalPool<T> localPool) {
249 this.localPool = localPool;
250 }
251
252 @Override
253 public void recycle(Object object) {
254 if (object != value) {
255 throw new IllegalArgumentException("object does not belong to handle");
256 }
257 localPool.release(this, true);
258 }
259
260 @Override
261 public void unguardedRecycle(Object object) {
262 if (object != value) {
263 throw new IllegalArgumentException("object does not belong to handle");
264 }
265 localPool.release(this, false);
266 }
267
268 T get() {
269 return value;
270 }
271
272 void set(T value) {
273 this.value = value;
274 }
275
276 void toClaimed() {
277 assert state == STATE_AVAILABLE;
278 STATE_UPDATER.lazySet(this, STATE_CLAIMED);
279 }
280
281 void toAvailable() {
282 int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
283 if (prev == STATE_AVAILABLE) {
284 throw new IllegalStateException("Object has been recycled already.");
285 }
286 }
287
288 void unguardedToAvailable() {
289 int prev = state;
290 if (prev == STATE_AVAILABLE) {
291 throw new IllegalStateException("Object has been recycled already.");
292 }
293 STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
294 }
295 }
296
297 private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
298 private final int ratioInterval;
299 private final int chunkSize;
300 private final ArrayDeque<DefaultHandle<T>> batch;
301 private volatile Thread owner;
302 private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
303 private int ratioCounter;
304
305 @SuppressWarnings("unchecked")
306 LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
307 this.ratioInterval = ratioInterval;
308 this.chunkSize = chunkSize;
309 batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
310 Thread currentThread = Thread.currentThread();
311 owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null;
312 if (BLOCKING_POOL) {
313 pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
314 } else {
315 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
316 }
317 ratioCounter = ratioInterval;
318 }
319
320 DefaultHandle<T> claim() {
321 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
322 if (handles == null) {
323 return null;
324 }
325 if (batch.isEmpty()) {
326 handles.drain(this, chunkSize);
327 }
328 DefaultHandle<T> handle = batch.pollLast();
329 if (null != handle) {
330 handle.toClaimed();
331 }
332 return handle;
333 }
334
335 void release(DefaultHandle<T> handle, boolean guarded) {
336 if (guarded) {
337 handle.toAvailable();
338 } else {
339 handle.unguardedToAvailable();
340 }
341 Thread owner = this.owner;
342 if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
343 accept(handle);
344 } else if (owner != null && isTerminated(owner)) {
345 this.owner = null;
346 pooledHandles = null;
347 } else {
348 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
349 if (handles != null) {
350 handles.relaxedOffer(handle);
351 }
352 }
353 }
354
355 private static boolean isTerminated(Thread owner) {
356
357
358 return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
359 }
360
361 DefaultHandle<T> newHandle() {
362 if (++ratioCounter >= ratioInterval) {
363 ratioCounter = 0;
364 return new DefaultHandle<T>(this);
365 }
366 return null;
367 }
368
369 @Override
370 public void accept(DefaultHandle<T> e) {
371 batch.addLast(e);
372 }
373 }
374
375
376
377
378
379
380
381 private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
382 private final Queue<T> deque;
383 private final int maxCapacity;
384
385 BlockingMessageQueue(int maxCapacity) {
386 this.maxCapacity = maxCapacity;
387
388
389
390
391
392
393
394
395
396 deque = new ArrayDeque<T>();
397 }
398
399 @Override
400 public synchronized boolean offer(T e) {
401 if (deque.size() == maxCapacity) {
402 return false;
403 }
404 return deque.offer(e);
405 }
406
407 @Override
408 public synchronized T poll() {
409 return deque.poll();
410 }
411
412 @Override
413 public synchronized T peek() {
414 return deque.peek();
415 }
416
417 @Override
418 public synchronized int size() {
419 return deque.size();
420 }
421
422 @Override
423 public synchronized void clear() {
424 deque.clear();
425 }
426
427 @Override
428 public synchronized boolean isEmpty() {
429 return deque.isEmpty();
430 }
431
432 @Override
433 public int capacity() {
434 return maxCapacity;
435 }
436
437 @Override
438 public boolean relaxedOffer(T e) {
439 return offer(e);
440 }
441
442 @Override
443 public T relaxedPoll() {
444 return poll();
445 }
446
447 @Override
448 public T relaxedPeek() {
449 return peek();
450 }
451
452 @Override
453 public int drain(Consumer<T> c, int limit) {
454 T obj;
455 int i = 0;
456 for (; i < limit && (obj = poll()) != null; i++) {
457 c.accept(obj);
458 }
459 return i;
460 }
461
462 @Override
463 public int fill(Supplier<T> s, int limit) {
464 throw new UnsupportedOperationException();
465 }
466
467 @Override
468 public int drain(Consumer<T> c) {
469 throw new UnsupportedOperationException();
470 }
471
472 @Override
473 public int fill(Supplier<T> s) {
474 throw new UnsupportedOperationException();
475 }
476
477 @Override
478 public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
479 throw new UnsupportedOperationException();
480 }
481
482 @Override
483 public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
484 throw new UnsupportedOperationException();
485 }
486 }
487 }