View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.concurrent.FastThreadLocal;
19  import io.netty.util.concurrent.FastThreadLocalThread;
20  import io.netty.util.internal.ObjectPool;
21  import io.netty.util.internal.PlatformDependent;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  import org.jctools.queues.MessagePassingQueue;
27  import org.jetbrains.annotations.VisibleForTesting;
28  
29  import java.util.ArrayDeque;
30  import java.util.Queue;
31  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32  
33  import static io.netty.util.internal.PlatformDependent.newMpscQueue;
34  import static java.lang.Math.max;
35  import static java.lang.Math.min;
36  
37  /**
38   * Light-weight object pool based on a thread-local stack.
39   *
40   * @param <T> the type of the pooled object
41   */
42  public abstract class Recycler<T> {
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
44      private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
45          @Override
46          public void recycle(Object object) {
47              // NOOP
48          }
49  
50          @Override
51          public void unguardedRecycle(final Object object) {
52              // NOOP
53          }
54  
55          @Override
56          public String toString() {
57              return "NOOP_HANDLE";
58          }
59      };
60      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
61      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
62      private static final int RATIO;
63      private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
64      private static final boolean BLOCKING_POOL;
65      private static final boolean BATCH_FAST_TL_ONLY;
66  
67      static {
68          // In the future, we might have different maxCapacity for different object types.
69          // e.g. io.netty.recycler.maxCapacity.writeTask
70          //      io.netty.recycler.maxCapacity.outboundBuffer
71          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
72                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
73          if (maxCapacityPerThread < 0) {
74              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
75          }
76  
77          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
78          DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
79  
80          // By default, we allow one push to a Recycler for each 8th try on handles that were never recycled before.
81          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
82          // bursts.
83          RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
84  
85          BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
86          BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
87  
88          if (logger.isDebugEnabled()) {
89              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
90                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
91                  logger.debug("-Dio.netty.recycler.ratio: disabled");
92                  logger.debug("-Dio.netty.recycler.chunkSize: disabled");
93                  logger.debug("-Dio.netty.recycler.blocking: disabled");
94                  logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
95              } else {
96                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
98                  logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
99                  logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
100                 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
101             }
102         }
103     }
104 
105     private final int maxCapacityPerThread;
106     private final int interval;
107     private final int chunkSize;
108     private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
109         @Override
110         protected LocalPool<T> initialValue() {
111             return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
112         }
113 
114         @Override
115         protected void onRemoval(LocalPool<T> value) throws Exception {
116             super.onRemoval(value);
117             MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
118             value.pooledHandles = null;
119             value.owner = null;
120             handles.clear();
121         }
122     };
123 
124     protected Recycler() {
125         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
126     }
127 
128     protected Recycler(int maxCapacityPerThread) {
129         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
130     }
131 
132     /**
133      * @deprecated Use one of the following instead:
134      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
135      */
136     @Deprecated
137     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
138     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
140     }
141 
142     /**
143      * @deprecated Use one of the following instead:
144      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
145      */
146     @Deprecated
147     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
148     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
149                        int ratio, int maxDelayedQueuesPerThread) {
150         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
151     }
152 
153     /**
154      * @deprecated Use one of the following instead:
155      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
156      */
157     @Deprecated
158     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
159     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
160                        int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
161         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
162     }
163 
164     protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
165         interval = max(0, ratio);
166         if (maxCapacityPerThread <= 0) {
167             this.maxCapacityPerThread = 0;
168             this.chunkSize = 0;
169         } else {
170             this.maxCapacityPerThread = max(4, maxCapacityPerThread);
171             this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
172         }
173     }
174 
175     @SuppressWarnings("unchecked")
176     public final T get() {
177         if (maxCapacityPerThread == 0 || PlatformDependent.isVirtualThread(Thread.currentThread())) {
178             return newObject((Handle<T>) NOOP_HANDLE);
179         }
180         LocalPool<T> localPool = threadLocal.get();
181         DefaultHandle<T> handle = localPool.claim();
182         T obj;
183         if (handle == null) {
184             handle = localPool.newHandle();
185             if (handle != null) {
186                 obj = newObject(handle);
187                 handle.set(obj);
188             } else {
189                 obj = newObject((Handle<T>) NOOP_HANDLE);
190             }
191         } else {
192             obj = handle.get();
193         }
194 
195         return obj;
196     }
197 
198     /**
199      * @deprecated use {@link Handle#recycle(Object)}.
200      */
201     @Deprecated
202     public final boolean recycle(T o, Handle<T> handle) {
203         if (handle == NOOP_HANDLE) {
204             return false;
205         }
206 
207         handle.recycle(o);
208         return true;
209     }
210 
211     @VisibleForTesting
212     final int threadLocalSize() {
213         if (PlatformDependent.isVirtualThread(Thread.currentThread())) {
214             return 0;
215         }
216         LocalPool<T> localPool = threadLocal.getIfExists();
217         return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
218     }
219 
220     /**
221      * @param handle can NOT be null.
222      */
223     protected abstract T newObject(Handle<T> handle);
224 
225     @SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
226     public interface Handle<T> extends ObjectPool.Handle<T>  { }
227 
228     @UnstableApi
229     public abstract static class EnhancedHandle<T> implements Handle<T> {
230 
231         public abstract void unguardedRecycle(Object object);
232 
233         private EnhancedHandle() {
234         }
235     }
236 
237     private static final class DefaultHandle<T> extends EnhancedHandle<T> {
238         private static final int STATE_CLAIMED = 0;
239         private static final int STATE_AVAILABLE = 1;
240         private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
241         static {
242             AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
243             //noinspection unchecked
244             STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
245         }
246 
247         private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
248         private final LocalPool<T> localPool;
249         private T value;
250 
251         DefaultHandle(LocalPool<T> localPool) {
252             this.localPool = localPool;
253         }
254 
255         @Override
256         public void recycle(Object object) {
257             if (object != value) {
258                 throw new IllegalArgumentException("object does not belong to handle");
259             }
260             localPool.release(this, true);
261         }
262 
263         @Override
264         public void unguardedRecycle(Object object) {
265             if (object != value) {
266                 throw new IllegalArgumentException("object does not belong to handle");
267             }
268             localPool.release(this, false);
269         }
270 
271         T get() {
272             return value;
273         }
274 
275         void set(T value) {
276             this.value = value;
277         }
278 
279         void toClaimed() {
280             assert state == STATE_AVAILABLE;
281             STATE_UPDATER.lazySet(this, STATE_CLAIMED);
282         }
283 
284         void toAvailable() {
285             int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
286             if (prev == STATE_AVAILABLE) {
287                 throw new IllegalStateException("Object has been recycled already.");
288             }
289         }
290 
291         void unguardedToAvailable() {
292             int prev = state;
293             if (prev == STATE_AVAILABLE) {
294                 throw new IllegalStateException("Object has been recycled already.");
295             }
296             STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
297         }
298     }
299 
300     private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
301         private final int ratioInterval;
302         private final int chunkSize;
303         private final ArrayDeque<DefaultHandle<T>> batch;
304         private volatile Thread owner;
305         private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
306         private int ratioCounter;
307 
308         @SuppressWarnings("unchecked")
309         LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
310             this.ratioInterval = ratioInterval;
311             this.chunkSize = chunkSize;
312             batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
313             Thread currentThread = Thread.currentThread();
314             owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null;
315             if (BLOCKING_POOL) {
316                 pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
317             } else {
318                 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
319             }
320             ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
321         }
322 
323         DefaultHandle<T> claim() {
324             MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
325             if (handles == null) {
326                 return null;
327             }
328             if (batch.isEmpty()) {
329                 handles.drain(this, chunkSize);
330             }
331             DefaultHandle<T> handle = batch.pollLast();
332             if (null != handle) {
333                 handle.toClaimed();
334             }
335             return handle;
336         }
337 
338         void release(DefaultHandle<T> handle, boolean guarded) {
339             if (guarded) {
340                 handle.toAvailable();
341             } else {
342                 handle.unguardedToAvailable();
343             }
344             Thread owner = this.owner;
345             if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
346                 accept(handle);
347             } else if (owner != null && isTerminated(owner)) {
348                 this.owner = null;
349                 pooledHandles = null;
350             } else {
351                 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
352                 if (handles != null) {
353                     handles.relaxedOffer(handle);
354                 }
355             }
356         }
357 
358         private static boolean isTerminated(Thread owner) {
359             // Do not use `Thread.getState()` in J9 JVM because it's known to have a performance issue.
360             // See: https://github.com/netty/netty/issues/13347#issuecomment-1518537895
361             return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
362         }
363 
364         DefaultHandle<T> newHandle() {
365             if (++ratioCounter >= ratioInterval) {
366                 ratioCounter = 0;
367                 return new DefaultHandle<T>(this);
368             }
369             return null;
370         }
371 
372         @Override
373         public void accept(DefaultHandle<T> e) {
374             batch.addLast(e);
375         }
376     }
377 
378     /**
379      * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
380      * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
381      * The implementation relies on synchronised monitor locks for thread-safety.
382      * The {@code fill} bulk operation is not supported by this implementation.
383      */
384     private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
385         private final Queue<T> deque;
386         private final int maxCapacity;
387 
388         BlockingMessageQueue(int maxCapacity) {
389             this.maxCapacity = maxCapacity;
390             // This message passing queue is backed by an ArrayDeque instance,
391             // made thread-safe by synchronising on `this` BlockingMessageQueue instance.
392             // Why ArrayDeque?
393             // We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient.
394             // We use ArrayDeque instead of ArrayList because we need the queue APIs.
395             // We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size().
396             // We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front,
397             // and these queues will usually have large capacities, in potentially great numbers (one per thread),
398             // but often only have comparatively few items in them.
399             deque = new ArrayDeque<T>();
400         }
401 
402         @Override
403         public synchronized boolean offer(T e) {
404             if (deque.size() == maxCapacity) {
405                 return false;
406             }
407             return deque.offer(e);
408         }
409 
410         @Override
411         public synchronized T poll() {
412             return deque.poll();
413         }
414 
415         @Override
416         public synchronized T peek() {
417             return deque.peek();
418         }
419 
420         @Override
421         public synchronized int size() {
422             return deque.size();
423         }
424 
425         @Override
426         public synchronized void clear() {
427             deque.clear();
428         }
429 
430         @Override
431         public synchronized boolean isEmpty() {
432             return deque.isEmpty();
433         }
434 
435         @Override
436         public int capacity() {
437             return maxCapacity;
438         }
439 
440         @Override
441         public boolean relaxedOffer(T e) {
442             return offer(e);
443         }
444 
445         @Override
446         public T relaxedPoll() {
447             return poll();
448         }
449 
450         @Override
451         public T relaxedPeek() {
452             return peek();
453         }
454 
455         @Override
456         public int drain(Consumer<T> c, int limit) {
457             T obj;
458             int i = 0;
459             for (; i < limit && (obj = poll()) != null; i++) {
460                 c.accept(obj);
461             }
462             return i;
463         }
464 
465         @Override
466         public int fill(Supplier<T> s, int limit) {
467             throw new UnsupportedOperationException();
468         }
469 
470         @Override
471         public int drain(Consumer<T> c) {
472             throw new UnsupportedOperationException();
473         }
474 
475         @Override
476         public int fill(Supplier<T> s) {
477             throw new UnsupportedOperationException();
478         }
479 
480         @Override
481         public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
482             throw new UnsupportedOperationException();
483         }
484 
485         @Override
486         public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
487             throw new UnsupportedOperationException();
488         }
489     }
490 }