View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.concurrent.FastThreadLocal;
19  import io.netty.util.concurrent.FastThreadLocalThread;
20  import io.netty.util.internal.ObjectPool;
21  import io.netty.util.internal.PlatformDependent;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  import org.jctools.queues.MessagePassingQueue;
27  import org.jetbrains.annotations.VisibleForTesting;
28  
29  import java.util.ArrayDeque;
30  import java.util.Queue;
31  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32  
33  import static io.netty.util.internal.PlatformDependent.newMpscQueue;
34  import static java.lang.Math.max;
35  import static java.lang.Math.min;
36  
37  /**
38   * Light-weight object pool based on a thread-local stack.
39   *
40   * @param <T> the type of the pooled object
41   */
42  public abstract class Recycler<T> {
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
44      private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
45          @Override
46          public void recycle(Object object) {
47              // NOOP
48          }
49  
50          @Override
51          public void unguardedRecycle(final Object object) {
52              // NOOP
53          }
54  
55          @Override
56          public String toString() {
57              return "NOOP_HANDLE";
58          }
59      };
60      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
61      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
62      private static final int RATIO;
63      private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
64      private static final boolean BLOCKING_POOL;
65      private static final boolean BATCH_FAST_TL_ONLY;
66  
67      static {
68          // In the future, we might have different maxCapacity for different object types.
69          // e.g. io.netty.recycler.maxCapacity.writeTask
70          //      io.netty.recycler.maxCapacity.outboundBuffer
71          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
72                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
73          if (maxCapacityPerThread < 0) {
74              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
75          }
76  
77          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
78          DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
79  
80          // By default, we allow one push to a Recycler for each 8th try on handles that were never recycled before.
81          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
82          // bursts.
83          RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
84  
85          BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
86          BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
87  
88          if (logger.isDebugEnabled()) {
89              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
90                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
91                  logger.debug("-Dio.netty.recycler.ratio: disabled");
92                  logger.debug("-Dio.netty.recycler.chunkSize: disabled");
93                  logger.debug("-Dio.netty.recycler.blocking: disabled");
94                  logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
95              } else {
96                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
98                  logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
99                  logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
100                 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
101             }
102         }
103     }
104 
105     private final int maxCapacityPerThread;
106     private final int interval;
107     private final int chunkSize;
108     private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
109         @Override
110         protected LocalPool<T> initialValue() {
111             return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
112         }
113 
114         @Override
115         protected void onRemoval(LocalPool<T> value) throws Exception {
116             super.onRemoval(value);
117             MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
118             value.pooledHandles = null;
119             value.owner = null;
120             handles.clear();
121         }
122     };
123 
124     protected Recycler() {
125         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
126     }
127 
128     protected Recycler(int maxCapacityPerThread) {
129         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
130     }
131 
132     /**
133      * @deprecated Use one of the following instead:
134      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
135      */
136     @Deprecated
137     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
138     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
140     }
141 
142     /**
143      * @deprecated Use one of the following instead:
144      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
145      */
146     @Deprecated
147     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
148     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
149                        int ratio, int maxDelayedQueuesPerThread) {
150         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
151     }
152 
153     /**
154      * @deprecated Use one of the following instead:
155      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
156      */
157     @Deprecated
158     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
159     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
160                        int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
161         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
162     }
163 
164     protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
165         interval = max(0, ratio);
166         if (maxCapacityPerThread <= 0) {
167             this.maxCapacityPerThread = 0;
168             this.chunkSize = 0;
169         } else {
170             this.maxCapacityPerThread = max(4, maxCapacityPerThread);
171             this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
172         }
173     }
174 
175     @SuppressWarnings("unchecked")
176     public final T get() {
177         if (maxCapacityPerThread == 0 ||
178                 (PlatformDependent.isVirtualThread(Thread.currentThread()) &&
179                         !FastThreadLocalThread.currentThreadHasFastThreadLocal())) {
180             return newObject((Handle<T>) NOOP_HANDLE);
181         }
182         LocalPool<T> localPool = threadLocal.get();
183         DefaultHandle<T> handle = localPool.claim();
184         T obj;
185         if (handle == null) {
186             handle = localPool.newHandle();
187             if (handle != null) {
188                 obj = newObject(handle);
189                 handle.set(obj);
190             } else {
191                 obj = newObject((Handle<T>) NOOP_HANDLE);
192             }
193         } else {
194             obj = handle.get();
195         }
196 
197         return obj;
198     }
199 
200     /**
201      * @deprecated use {@link Handle#recycle(Object)}.
202      */
203     @Deprecated
204     public final boolean recycle(T o, Handle<T> handle) {
205         if (handle == NOOP_HANDLE) {
206             return false;
207         }
208 
209         handle.recycle(o);
210         return true;
211     }
212 
213     @VisibleForTesting
214     final int threadLocalSize() {
215         if (PlatformDependent.isVirtualThread(Thread.currentThread()) &&
216                 !FastThreadLocalThread.currentThreadHasFastThreadLocal()) {
217             return 0;
218         }
219         LocalPool<T> localPool = threadLocal.getIfExists();
220         return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
221     }
222 
223     /**
224      * @param handle can NOT be null.
225      */
226     protected abstract T newObject(Handle<T> handle);
227 
228     @SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
229     public interface Handle<T> extends ObjectPool.Handle<T>  { }
230 
231     @UnstableApi
232     public abstract static class EnhancedHandle<T> implements Handle<T> {
233 
234         public abstract void unguardedRecycle(Object object);
235 
236         private EnhancedHandle() {
237         }
238     }
239 
240     private static final class DefaultHandle<T> extends EnhancedHandle<T> {
241         private static final int STATE_CLAIMED = 0;
242         private static final int STATE_AVAILABLE = 1;
243         private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
244         static {
245             AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
246             //noinspection unchecked
247             STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
248         }
249 
250         private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
251         private final LocalPool<T> localPool;
252         private T value;
253 
254         DefaultHandle(LocalPool<T> localPool) {
255             this.localPool = localPool;
256         }
257 
258         @Override
259         public void recycle(Object object) {
260             if (object != value) {
261                 throw new IllegalArgumentException("object does not belong to handle");
262             }
263             localPool.release(this, true);
264         }
265 
266         @Override
267         public void unguardedRecycle(Object object) {
268             if (object != value) {
269                 throw new IllegalArgumentException("object does not belong to handle");
270             }
271             localPool.release(this, false);
272         }
273 
274         T get() {
275             return value;
276         }
277 
278         void set(T value) {
279             this.value = value;
280         }
281 
282         void toClaimed() {
283             assert state == STATE_AVAILABLE;
284             STATE_UPDATER.lazySet(this, STATE_CLAIMED);
285         }
286 
287         void toAvailable() {
288             int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
289             if (prev == STATE_AVAILABLE) {
290                 throw new IllegalStateException("Object has been recycled already.");
291             }
292         }
293 
294         void unguardedToAvailable() {
295             int prev = state;
296             if (prev == STATE_AVAILABLE) {
297                 throw new IllegalStateException("Object has been recycled already.");
298             }
299             STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
300         }
301     }
302 
303     private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
304         private final int ratioInterval;
305         private final int chunkSize;
306         private final ArrayDeque<DefaultHandle<T>> batch;
307         private volatile Thread owner;
308         private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
309         private int ratioCounter;
310 
311         @SuppressWarnings("unchecked")
312         LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
313             this.ratioInterval = ratioInterval;
314             this.chunkSize = chunkSize;
315             batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
316             Thread currentThread = Thread.currentThread();
317             owner = !BATCH_FAST_TL_ONLY || FastThreadLocalThread.currentThreadHasFastThreadLocal()
318                     ? currentThread : null;
319             if (BLOCKING_POOL) {
320                 pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
321             } else {
322                 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
323             }
324             ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
325         }
326 
327         DefaultHandle<T> claim() {
328             MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
329             if (handles == null) {
330                 return null;
331             }
332             if (batch.isEmpty()) {
333                 handles.drain(this, chunkSize);
334             }
335             DefaultHandle<T> handle = batch.pollLast();
336             if (null != handle) {
337                 handle.toClaimed();
338             }
339             return handle;
340         }
341 
342         void release(DefaultHandle<T> handle, boolean guarded) {
343             if (guarded) {
344                 handle.toAvailable();
345             } else {
346                 handle.unguardedToAvailable();
347             }
348             Thread owner = this.owner;
349             if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
350                 accept(handle);
351             } else if (owner != null && isTerminated(owner)) {
352                 this.owner = null;
353                 pooledHandles = null;
354             } else {
355                 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
356                 if (handles != null) {
357                     handles.relaxedOffer(handle);
358                 }
359             }
360         }
361 
362         private static boolean isTerminated(Thread owner) {
363             // Do not use `Thread.getState()` in J9 JVM because it's known to have a performance issue.
364             // See: https://github.com/netty/netty/issues/13347#issuecomment-1518537895
365             return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
366         }
367 
368         DefaultHandle<T> newHandle() {
369             if (++ratioCounter >= ratioInterval) {
370                 ratioCounter = 0;
371                 return new DefaultHandle<T>(this);
372             }
373             return null;
374         }
375 
376         @Override
377         public void accept(DefaultHandle<T> e) {
378             batch.addLast(e);
379         }
380     }
381 
382     /**
383      * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
384      * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
385      * The implementation relies on synchronised monitor locks for thread-safety.
386      * The {@code fill} bulk operation is not supported by this implementation.
387      */
388     private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
389         private final Queue<T> deque;
390         private final int maxCapacity;
391 
392         BlockingMessageQueue(int maxCapacity) {
393             this.maxCapacity = maxCapacity;
394             // This message passing queue is backed by an ArrayDeque instance,
395             // made thread-safe by synchronising on `this` BlockingMessageQueue instance.
396             // Why ArrayDeque?
397             // We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient.
398             // We use ArrayDeque instead of ArrayList because we need the queue APIs.
399             // We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size().
400             // We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front,
401             // and these queues will usually have large capacities, in potentially great numbers (one per thread),
402             // but often only have comparatively few items in them.
403             deque = new ArrayDeque<T>();
404         }
405 
406         @Override
407         public synchronized boolean offer(T e) {
408             if (deque.size() == maxCapacity) {
409                 return false;
410             }
411             return deque.offer(e);
412         }
413 
414         @Override
415         public synchronized T poll() {
416             return deque.poll();
417         }
418 
419         @Override
420         public synchronized T peek() {
421             return deque.peek();
422         }
423 
424         @Override
425         public synchronized int size() {
426             return deque.size();
427         }
428 
429         @Override
430         public synchronized void clear() {
431             deque.clear();
432         }
433 
434         @Override
435         public synchronized boolean isEmpty() {
436             return deque.isEmpty();
437         }
438 
439         @Override
440         public int capacity() {
441             return maxCapacity;
442         }
443 
444         @Override
445         public boolean relaxedOffer(T e) {
446             return offer(e);
447         }
448 
449         @Override
450         public T relaxedPoll() {
451             return poll();
452         }
453 
454         @Override
455         public T relaxedPeek() {
456             return peek();
457         }
458 
459         @Override
460         public int drain(Consumer<T> c, int limit) {
461             T obj;
462             int i = 0;
463             for (; i < limit && (obj = poll()) != null; i++) {
464                 c.accept(obj);
465             }
466             return i;
467         }
468 
469         @Override
470         public int fill(Supplier<T> s, int limit) {
471             throw new UnsupportedOperationException();
472         }
473 
474         @Override
475         public int drain(Consumer<T> c) {
476             throw new UnsupportedOperationException();
477         }
478 
479         @Override
480         public int fill(Supplier<T> s) {
481             throw new UnsupportedOperationException();
482         }
483 
484         @Override
485         public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
486             throw new UnsupportedOperationException();
487         }
488 
489         @Override
490         public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
491             throw new UnsupportedOperationException();
492         }
493     }
494 }