1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.concurrent.FastThreadLocal;
19 import io.netty.util.concurrent.FastThreadLocalThread;
20 import io.netty.util.internal.ObjectPool;
21 import io.netty.util.internal.PlatformDependent;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26 import org.jctools.queues.MessagePassingQueue;
27 import org.jetbrains.annotations.VisibleForTesting;
28
29 import java.util.ArrayDeque;
30 import java.util.Objects;
31 import java.util.Queue;
32 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33
34 import static io.netty.util.internal.PlatformDependent.newFixedMpmcQueue;
35 import static io.netty.util.internal.PlatformDependent.newMpscQueue;
36 import static java.lang.Math.max;
37 import static java.lang.Math.min;
38
39
40
41
42
43
44 public abstract class Recycler<T> {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
46
47
48
49
50
51 private static final class LocalPoolHandle<T> extends EnhancedHandle<T> {
52 private final UnguardedLocalPool<T> pool;
53
54 private LocalPoolHandle(UnguardedLocalPool<T> pool) {
55 this.pool = pool;
56 }
57
58 @Override
59 public void recycle(T object) {
60 UnguardedLocalPool<T> pool = this.pool;
61 if (pool != null) {
62 pool.release(object);
63 }
64 }
65
66 @Override
67 public void unguardedRecycle(final Object object) {
68 UnguardedLocalPool<T> pool = this.pool;
69 if (pool != null) {
70 pool.release((T) object);
71 }
72 }
73 }
74
75 private static final EnhancedHandle<?> NOOP_HANDLE = new LocalPoolHandle<>(null);
76 private static final UnguardedLocalPool<?> NOOP_LOCAL_POOL = new UnguardedLocalPool<>(0);
77 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
78 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
79 private static final int RATIO;
80 private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
81 private static final boolean BLOCKING_POOL;
82 private static final boolean BATCH_FAST_TL_ONLY;
83
84 static {
85
86
87
88 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
89 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
90 if (maxCapacityPerThread < 0) {
91 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
92 }
93
94 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
95 DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
96
97
98
99
100 RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
101
102 BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
103 BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
104
105 if (logger.isDebugEnabled()) {
106 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
107 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
108 logger.debug("-Dio.netty.recycler.ratio: disabled");
109 logger.debug("-Dio.netty.recycler.chunkSize: disabled");
110 logger.debug("-Dio.netty.recycler.blocking: disabled");
111 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
112 } else {
113 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
114 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
115 logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
116 logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
117 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
118 }
119 }
120 }
121
122 private final LocalPool<?, T> localPool;
123 private final FastThreadLocal<LocalPool<?, T>> threadLocalPool;
124
125
126
127
128
129
130
131
132
133
134
135
136 protected Recycler(int maxCapacity, boolean unguarded) {
137 if (maxCapacity <= 0) {
138 maxCapacity = 0;
139 } else {
140 maxCapacity = max(4, maxCapacity);
141 }
142 threadLocalPool = null;
143 if (maxCapacity == 0) {
144 localPool = (LocalPool<?, T>) NOOP_LOCAL_POOL;
145 } else {
146 localPool = unguarded? new UnguardedLocalPool<>(maxCapacity) : new GuardedLocalPool<>(maxCapacity);
147 }
148 }
149
150
151
152
153
154
155
156
157 protected Recycler(boolean unguarded) {
158 this(DEFAULT_MAX_CAPACITY_PER_THREAD, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD, unguarded);
159 }
160
161
162
163
164
165
166
167
168
169
170
171 protected Recycler(Thread owner, boolean unguarded) {
172 this(DEFAULT_MAX_CAPACITY_PER_THREAD, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD, owner, unguarded);
173 }
174
175 protected Recycler(int maxCapacityPerThread) {
176 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
177 }
178
179 protected Recycler() {
180 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
181 }
182
183
184
185
186
187
188 protected Recycler(int chunksSize, int maxCapacityPerThread, boolean unguarded) {
189 this(maxCapacityPerThread, RATIO, chunksSize, unguarded);
190 }
191
192
193
194
195
196
197
198
199
200 protected Recycler(int chunkSize, int maxCapacityPerThread, Thread owner, boolean unguarded) {
201 this(maxCapacityPerThread, RATIO, chunkSize, owner, unguarded);
202 }
203
204
205
206
207
208 @Deprecated
209 @SuppressWarnings("unused")
210 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
211 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
212 }
213
214
215
216
217
218 @Deprecated
219 @SuppressWarnings("unused")
220 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
221 int ratio, int maxDelayedQueuesPerThread) {
222 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
223 }
224
225
226
227
228
229 @Deprecated
230 @SuppressWarnings("unused")
231 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
232 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
233 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
234 }
235
236 protected Recycler(int maxCapacityPerThread, int interval, int chunkSize) {
237 this(maxCapacityPerThread, interval, chunkSize, true, null, false);
238 }
239
240
241
242
243
244
245 protected Recycler(int maxCapacityPerThread, int interval, int chunkSize, boolean unguarded) {
246 this(maxCapacityPerThread, interval, chunkSize, true, null, unguarded);
247 }
248
249
250
251
252
253
254 protected Recycler(int maxCapacityPerThread, int interval, int chunkSize, Thread owner, boolean unguarded) {
255 this(maxCapacityPerThread, interval, chunkSize, false, owner, unguarded);
256 }
257
258 @SuppressWarnings("unchecked")
259 private Recycler(int maxCapacityPerThread, int ratio, int chunkSize, boolean useThreadLocalStorage,
260 Thread owner, boolean unguarded) {
261 final int interval = max(0, ratio);
262 if (maxCapacityPerThread <= 0) {
263 maxCapacityPerThread = 0;
264 chunkSize = 0;
265 } else {
266 maxCapacityPerThread = max(4, maxCapacityPerThread);
267 chunkSize = max(2, min(chunkSize, maxCapacityPerThread >> 1));
268 }
269 if (maxCapacityPerThread > 0 && useThreadLocalStorage) {
270 final int finalMaxCapacityPerThread = maxCapacityPerThread;
271 final int finalChunkSize = chunkSize;
272 threadLocalPool = new FastThreadLocal<LocalPool<?, T>>() {
273 @Override
274 protected LocalPool<?, T> initialValue() {
275 return unguarded? new UnguardedLocalPool<>(finalMaxCapacityPerThread, interval, finalChunkSize) :
276 new GuardedLocalPool<>(finalMaxCapacityPerThread, interval, finalChunkSize);
277 }
278
279 @Override
280 protected void onRemoval(LocalPool<?, T> value) throws Exception {
281 super.onRemoval(value);
282 MessagePassingQueue<?> handles = value.pooledHandles;
283 value.pooledHandles = null;
284 value.owner = null;
285 if (handles != null) {
286 handles.clear();
287 }
288 }
289 };
290 localPool = null;
291 } else {
292 threadLocalPool = null;
293 if (maxCapacityPerThread == 0) {
294 localPool = (LocalPool<?, T>) NOOP_LOCAL_POOL;
295 } else {
296 Objects.requireNonNull(owner, "owner");
297 localPool = unguarded? new UnguardedLocalPool<>(owner, maxCapacityPerThread, interval, chunkSize) :
298 new GuardedLocalPool<>(owner, maxCapacityPerThread, interval, chunkSize);
299 }
300 }
301 }
302
303 @SuppressWarnings("unchecked")
304 public final T get() {
305 if (localPool != null) {
306 return localPool.getWith(this);
307 } else {
308 if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals()) {
309 return newObject((Handle<T>) NOOP_HANDLE);
310 }
311 return threadLocalPool.get().getWith(this);
312 }
313 }
314
315
316
317
318
319
320
321
322
323 public static void unpinOwner(Recycler<?> recycler) {
324 if (recycler.localPool != null) {
325 recycler.localPool.owner = null;
326 }
327 }
328
329
330
331
332 @Deprecated
333 public final boolean recycle(T o, Handle<T> handle) {
334 if (handle == NOOP_HANDLE) {
335 return false;
336 }
337
338 handle.recycle(o);
339 return true;
340 }
341
342 @VisibleForTesting
343 final int threadLocalSize() {
344 if (localPool != null) {
345 return localPool.size();
346 } else {
347 if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals()) {
348 return 0;
349 }
350 final LocalPool<?, T> pool = threadLocalPool.getIfExists();
351 if (pool == null) {
352 return 0;
353 }
354 return pool.size();
355 }
356 }
357
358
359
360
361 protected abstract T newObject(Handle<T> handle);
362
363 @SuppressWarnings("ClassNameSameAsAncestorName")
364 public interface Handle<T> extends ObjectPool.Handle<T> { }
365
366 @UnstableApi
367 public abstract static class EnhancedHandle<T> implements Handle<T> {
368
369 public abstract void unguardedRecycle(Object object);
370
371 private EnhancedHandle() {
372 }
373 }
374
375 private static final class DefaultHandle<T> extends EnhancedHandle<T> {
376 private static final int STATE_CLAIMED = 0;
377 private static final int STATE_AVAILABLE = 1;
378 private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
379 static {
380 AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
381
382 STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
383 }
384
385 private volatile int state;
386 private final GuardedLocalPool<T> localPool;
387 private T value;
388
389 DefaultHandle(GuardedLocalPool<T> localPool) {
390 this.localPool = localPool;
391 }
392
393 @Override
394 public void recycle(Object object) {
395 if (object != value) {
396 throw new IllegalArgumentException("object does not belong to handle");
397 }
398 toAvailable();
399 localPool.release(this);
400 }
401
402 @Override
403 public void unguardedRecycle(Object object) {
404 if (object != value) {
405 throw new IllegalArgumentException("object does not belong to handle");
406 }
407 unguardedToAvailable();
408 localPool.release(this);
409 }
410
411 T claim() {
412 assert state == STATE_AVAILABLE;
413 STATE_UPDATER.lazySet(this, STATE_CLAIMED);
414 return value;
415 }
416
417 void set(T value) {
418 this.value = value;
419 }
420
421 private void toAvailable() {
422 int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
423 if (prev == STATE_AVAILABLE) {
424 throw new IllegalStateException("Object has been recycled already.");
425 }
426 }
427
428 private void unguardedToAvailable() {
429 int prev = state;
430 if (prev == STATE_AVAILABLE) {
431 throw new IllegalStateException("Object has been recycled already.");
432 }
433 STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
434 }
435 }
436
437 private static final class GuardedLocalPool<T> extends LocalPool<DefaultHandle<T>, T> {
438
439 GuardedLocalPool(int maxCapacity) {
440 super(maxCapacity);
441 }
442
443 GuardedLocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
444 super(owner, maxCapacity, ratioInterval, chunkSize);
445 }
446
447 GuardedLocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
448 super(maxCapacity, ratioInterval, chunkSize);
449 }
450
451 @Override
452 public T getWith(Recycler<T> recycler) {
453 DefaultHandle<T> handle = acquire();
454 T obj;
455 if (handle == null) {
456 handle = canAllocatePooled()? new DefaultHandle<>(this) : null;
457 if (handle != null) {
458 obj = recycler.newObject(handle);
459 handle.set(obj);
460 } else {
461 obj = recycler.newObject((Handle<T>) NOOP_HANDLE);
462 }
463 } else {
464 obj = handle.claim();
465 }
466 return obj;
467 }
468 }
469
470 private static final class UnguardedLocalPool<T> extends LocalPool<T, T> {
471 private final EnhancedHandle<T> handle;
472
473 UnguardedLocalPool(int maxCapacity) {
474 super(maxCapacity);
475 handle = maxCapacity == 0? null : new LocalPoolHandle<>(this);
476 }
477
478 UnguardedLocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
479 super(owner, maxCapacity, ratioInterval, chunkSize);
480 handle = new LocalPoolHandle<>(this);
481 }
482
483 UnguardedLocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
484 super(maxCapacity, ratioInterval, chunkSize);
485 handle = new LocalPoolHandle<>(this);
486 }
487
488 @Override
489 public T getWith(Recycler<T> recycler) {
490 T obj = acquire();
491 if (obj == null) {
492 obj = recycler.newObject(canAllocatePooled()? handle : (Handle<T>) NOOP_HANDLE);
493 }
494 return obj;
495 }
496 }
497
498 private abstract static class LocalPool<H, T> {
499 private final int ratioInterval;
500 private final H[] batch;
501 private int batchSize;
502 private Thread owner;
503 private MessagePassingQueue<H> pooledHandles;
504 private int ratioCounter;
505
506 LocalPool(int maxCapacity) {
507
508
509
510 this.ratioInterval = maxCapacity == 0? -1 : 0;
511 this.owner = null;
512 batch = null;
513 batchSize = 0;
514 pooledHandles = createExternalMcPool(maxCapacity);
515 ratioCounter = 0;
516 }
517
518 @SuppressWarnings("unchecked")
519 LocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
520 this.ratioInterval = ratioInterval;
521 this.owner = owner;
522 batch = owner != null? (H[]) new Object[chunkSize] : null;
523 batchSize = 0;
524 pooledHandles = createExternalScPool(chunkSize, maxCapacity);
525 ratioCounter = ratioInterval;
526 }
527
528 private static <H> MessagePassingQueue<H> createExternalMcPool(int maxCapacity) {
529 if (maxCapacity == 0) {
530 return null;
531 }
532 if (BLOCKING_POOL) {
533 return new BlockingMessageQueue<>(maxCapacity);
534 }
535 return (MessagePassingQueue<H>) newFixedMpmcQueue(maxCapacity);
536 }
537
538 private static <H> MessagePassingQueue<H> createExternalScPool(int chunkSize, int maxCapacity) {
539 if (maxCapacity == 0) {
540 return null;
541 }
542 if (BLOCKING_POOL) {
543 return new BlockingMessageQueue<>(maxCapacity);
544 }
545 return (MessagePassingQueue<H>) newMpscQueue(chunkSize, maxCapacity);
546 }
547
548 LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
549 this(!BATCH_FAST_TL_ONLY || FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals()
550 ? Thread.currentThread() : null, maxCapacity, ratioInterval, chunkSize);
551 }
552
553 protected final H acquire() {
554 int size = batchSize;
555 if (size == 0) {
556
557 final MessagePassingQueue<H> handles = pooledHandles;
558 if (handles == null) {
559 return null;
560 }
561 return handles.relaxedPoll();
562 }
563 int top = size - 1;
564 final H h = batch[top];
565 batchSize = top;
566 batch[top] = null;
567 return h;
568 }
569
570 protected final void release(H handle) {
571 Thread owner = this.owner;
572 if (owner != null && Thread.currentThread() == owner && batchSize < batch.length) {
573 batch[batchSize] = handle;
574 batchSize++;
575 } else if (owner != null && isTerminated(owner)) {
576 pooledHandles = null;
577 this.owner = null;
578 } else {
579 MessagePassingQueue<H> handles = pooledHandles;
580 if (handles != null) {
581 handles.relaxedOffer(handle);
582 }
583 }
584 }
585
586 private static boolean isTerminated(Thread owner) {
587
588
589 return PlatformDependent.isJ9Jvm()? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
590 }
591
592 boolean canAllocatePooled() {
593 if (ratioInterval < 0) {
594 return false;
595 }
596 if (ratioInterval == 0) {
597 return true;
598 }
599 if (++ratioCounter >= ratioInterval) {
600 ratioCounter = 0;
601 return true;
602 }
603 return false;
604 }
605
606 abstract T getWith(Recycler<T> recycler);
607
608 int size() {
609 MessagePassingQueue<H> handles = pooledHandles;
610 final int externalSize = handles != null? handles.size() : 0;
611 return externalSize + (batch != null? batchSize : 0);
612 }
613 }
614
615
616
617
618
619
620
621 private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
622 private final Queue<T> deque;
623 private final int maxCapacity;
624
625 BlockingMessageQueue(int maxCapacity) {
626 this.maxCapacity = maxCapacity;
627
628
629
630
631
632
633
634
635
636 deque = new ArrayDeque<T>();
637 }
638
639 @Override
640 public synchronized boolean offer(T e) {
641 if (deque.size() == maxCapacity) {
642 return false;
643 }
644 return deque.offer(e);
645 }
646
647 @Override
648 public synchronized T poll() {
649 return deque.poll();
650 }
651
652 @Override
653 public synchronized T peek() {
654 return deque.peek();
655 }
656
657 @Override
658 public synchronized int size() {
659 return deque.size();
660 }
661
662 @Override
663 public synchronized void clear() {
664 deque.clear();
665 }
666
667 @Override
668 public synchronized boolean isEmpty() {
669 return deque.isEmpty();
670 }
671
672 @Override
673 public int capacity() {
674 return maxCapacity;
675 }
676
677 @Override
678 public boolean relaxedOffer(T e) {
679 return offer(e);
680 }
681
682 @Override
683 public T relaxedPoll() {
684 return poll();
685 }
686
687 @Override
688 public T relaxedPeek() {
689 return peek();
690 }
691
692 @Override
693 public int drain(Consumer<T> c, int limit) {
694 T obj;
695 int i = 0;
696 for (; i < limit && (obj = poll()) != null; i++) {
697 c.accept(obj);
698 }
699 return i;
700 }
701
702 @Override
703 public int fill(Supplier<T> s, int limit) {
704 throw new UnsupportedOperationException();
705 }
706
707 @Override
708 public int drain(Consumer<T> c) {
709 throw new UnsupportedOperationException();
710 }
711
712 @Override
713 public int fill(Supplier<T> s) {
714 throw new UnsupportedOperationException();
715 }
716
717 @Override
718 public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
719 throw new UnsupportedOperationException();
720 }
721
722 @Override
723 public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
724 throw new UnsupportedOperationException();
725 }
726 }
727 }