View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util;
17  
18  import io.netty5.util.concurrent.FastThreadLocal;
19  import io.netty5.util.internal.ObjectPool;
20  import io.netty5.util.internal.PlatformDependent;
21  import io.netty5.util.internal.SystemPropertyUtil;
22  import io.netty5.util.internal.logging.InternalLogger;
23  import io.netty5.util.internal.logging.InternalLoggerFactory;
24  import org.jctools.queues.MessagePassingQueue;
25  
26  import java.util.ArrayDeque;
27  import java.util.Queue;
28  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
29  
30  import static io.netty5.util.internal.PlatformDependent.newMpscQueue;
31  import static java.lang.Math.max;
32  import static java.lang.Math.min;
33  
34  /**
35   * Light-weight object pool based on a thread-local stack.
36   *
37   * @param <T> the type of the pooled object
38   */
39  public abstract class Recycler<T> {
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
41      private static final Handle<?> NOOP_HANDLE = new Handle<>() {
42          @Override
43          public void recycle(Object object) {
44              // NOOP
45          }
46  
47          @Override
48          public String toString() {
49              return "NOOP_HANDLE";
50          }
51      };
52      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
53      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
54      private static final int RATIO;
55      private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
56      private static final boolean BLOCKING_POOL;
57  
58      static {
59          // In the future, we might have different maxCapacity for different object types.
60          // e.g. io.netty5.recycler.maxCapacity.writeTask
61          //      io.netty5.recycler.maxCapacity.outboundBuffer
62          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty5.recycler.maxCapacityPerThread",
63                  SystemPropertyUtil.getInt("io.netty5.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
64          if (maxCapacityPerThread < 0) {
65              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
66          }
67  
68          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
69          DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty5.recycler.chunkSize", 32);
70  
71          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
72          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
73          // bursts.
74          RATIO = max(0, SystemPropertyUtil.getInt("io.netty5.recycler.ratio", 8));
75  
76          BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty5.recycler.blocking", false);
77  
78          if (logger.isDebugEnabled()) {
79              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
80                  logger.debug("-Dio.netty5.recycler.maxCapacityPerThread: disabled");
81                  logger.debug("-Dio.netty5.recycler.ratio: disabled");
82                  logger.debug("-Dio.netty5.recycler.chunkSize: disabled");
83                  logger.debug("-Dio.netty5.recycler.blocking: disabled");
84              } else {
85                  logger.debug("-Dio.netty5.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
86                  logger.debug("-Dio.netty5.recycler.ratio: {}", RATIO);
87                  logger.debug("-Dio.netty5.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
88                  logger.debug("-Dio.netty5.recycler.blocking: {}", BLOCKING_POOL);
89              }
90          }
91      }
92  
93      private final int maxCapacityPerThread;
94      private final int interval;
95      private final int chunkSize;
96      private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<>() {
97          @Override
98          protected LocalPool<T> initialValue() {
99              return new LocalPool<>(maxCapacityPerThread, interval, chunkSize);
100         }
101 
102         @Override
103         protected void onRemoval(LocalPool<T> value) throws Exception {
104             super.onRemoval(value);
105             MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
106             value.pooledHandles = null;
107             handles.clear();
108         }
109     };
110 
111     protected Recycler() {
112         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
113     }
114 
115     protected Recycler(int maxCapacityPerThread) {
116         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
117     }
118 
119     /**
120      * @deprecated Use one of the following instead:
121      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
122      */
123     @Deprecated
124     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
125     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
126         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
127     }
128 
129     /**
130      * @deprecated Use one of the following instead:
131      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
132      */
133     @Deprecated
134     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
135     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
136                        int ratio, int maxDelayedQueuesPerThread) {
137         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
138     }
139 
140     /**
141      * @deprecated Use one of the following instead:
142      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
143      */
144     @Deprecated
145     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
146     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
147                        int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
148         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
149     }
150 
151     protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
152         interval = max(0, ratio);
153         if (maxCapacityPerThread <= 0) {
154             this.maxCapacityPerThread = 0;
155             this.chunkSize = 0;
156         } else {
157             this.maxCapacityPerThread = max(4, maxCapacityPerThread);
158             this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
159         }
160     }
161 
162     @SuppressWarnings("unchecked")
163     public final T get() {
164         if (maxCapacityPerThread == 0) {
165             return newObject((Handle<T>) NOOP_HANDLE);
166         }
167         LocalPool<T> localPool = threadLocal.get();
168         DefaultHandle<T> handle = localPool.claim();
169         T obj;
170         if (handle == null) {
171             handle = localPool.newHandle();
172             if (handle != null) {
173                 obj = newObject(handle);
174                 handle.set(obj);
175             } else {
176                 obj = newObject((Handle<T>) NOOP_HANDLE);
177             }
178         } else {
179             obj = handle.get();
180         }
181 
182         return obj;
183     }
184 
185     /**
186      * @deprecated use {@link Handle#recycle(Object)}.
187      */
188     @Deprecated
189     public final boolean recycle(T o, Handle<T> handle) {
190         if (handle == NOOP_HANDLE) {
191             return false;
192         }
193 
194         handle.recycle(o);
195         return true;
196     }
197 
198     final int threadLocalSize() {
199         return threadLocal.get().pooledHandles.size();
200     }
201 
202     protected abstract T newObject(Handle<T> handle);
203 
204     @SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
205     public interface Handle<T> extends ObjectPool.Handle<T>  { }
206 
207     private static final class DefaultHandle<T> implements Handle<T> {
208         private static final int STATE_CLAIMED = 0;
209         private static final int STATE_AVAILABLE = 1;
210         private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
211         static {
212             AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
213             //noinspection unchecked
214             STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
215         }
216 
217         @SuppressWarnings({"FieldMayBeFinal", "unused"}) // Updated by STATE_UPDATER.
218         private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
219         private final LocalPool<T> localPool;
220         private T value;
221 
222         DefaultHandle(LocalPool<T> localPool) {
223             this.localPool = localPool;
224         }
225 
226         @Override
227         public void recycle(Object object) {
228             if (object != value) {
229                 throw new IllegalArgumentException("object does not belong to handle");
230             }
231             localPool.release(this);
232         }
233 
234         T get() {
235             return value;
236         }
237 
238         void set(T value) {
239             this.value = value;
240         }
241 
242         boolean availableToClaim() {
243             if (state != STATE_AVAILABLE) {
244                 return false;
245             }
246             return STATE_UPDATER.compareAndSet(this, STATE_AVAILABLE, STATE_CLAIMED);
247         }
248 
249         void toAvailable() {
250             int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
251             if (prev == STATE_AVAILABLE) {
252                 throw new IllegalStateException("Object has been recycled already.");
253             }
254         }
255     }
256 
257     private static final class LocalPool<T> {
258         private final int ratioInterval;
259         private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
260         private int ratioCounter;
261 
262         @SuppressWarnings("unchecked")
263         LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
264             this.ratioInterval = ratioInterval;
265             if (BLOCKING_POOL) {
266                 pooledHandles = new BlockingMessageQueue<>(maxCapacity);
267             } else {
268                 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
269             }
270             ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
271         }
272 
273         DefaultHandle<T> claim() {
274             MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
275             if (handles == null) {
276                 return null;
277             }
278             DefaultHandle<T> handle;
279             do {
280                 handle = handles.relaxedPoll();
281             } while (handle != null && !handle.availableToClaim());
282             return handle;
283         }
284 
285         void release(DefaultHandle<T> handle) {
286             MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
287             handle.toAvailable();
288             if (handles != null) {
289                 handles.relaxedOffer(handle);
290             }
291         }
292 
293         DefaultHandle<T> newHandle() {
294             if (++ratioCounter >= ratioInterval) {
295                 ratioCounter = 0;
296                 return new DefaultHandle<>(this);
297             }
298             return null;
299         }
300     }
301 
302     /**
303      * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
304      * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
305      * The implementation relies on synchronised monitor locks for thread-safety.
306      * The {@code drain} and {@code fill} bulk operations are not supported by this implementation.
307      */
308     private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
309         private final Queue<T> deque;
310         private final int maxCapacity;
311 
312         BlockingMessageQueue(int maxCapacity) {
313             this.maxCapacity = maxCapacity;
314             // This message passing queue is backed by an ArrayDeque instance,
315             // made thread-safe by synchronising on `this` BlockingMessageQueue instance.
316             // Why ArrayDeque?
317             // We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient.
318             // We use ArrayDeque instead of ArrayList because we need the queue APIs.
319             // We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size().
320             // We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front,
321             // and these queues will usually have large capacities, in potentially great numbers (one per thread),
322             // but often only have comparatively few items in them.
323             deque = new ArrayDeque<>();
324         }
325 
326         @Override
327         public synchronized boolean offer(T e) {
328             if (deque.size() == maxCapacity) {
329                 return false;
330             }
331             return deque.offer(e);
332         }
333 
334         @Override
335         public synchronized T poll() {
336             return deque.poll();
337         }
338 
339         @Override
340         public synchronized T peek() {
341             return deque.peek();
342         }
343 
344         @Override
345         public synchronized int size() {
346             return deque.size();
347         }
348 
349         @Override
350         public synchronized void clear() {
351             deque.clear();
352         }
353 
354         @Override
355         public synchronized boolean isEmpty() {
356             return deque.isEmpty();
357         }
358 
359         @Override
360         public int capacity() {
361             return maxCapacity;
362         }
363 
364         @Override
365         public boolean relaxedOffer(T e) {
366             return offer(e);
367         }
368 
369         @Override
370         public T relaxedPoll() {
371             return poll();
372         }
373 
374         @Override
375         public T relaxedPeek() {
376             return peek();
377         }
378 
379         @Override
380         public int drain(Consumer<T> c, int limit) {
381             throw new UnsupportedOperationException();
382         }
383 
384         @Override
385         public int fill(Supplier<T> s, int limit) {
386             throw new UnsupportedOperationException();
387         }
388 
389         @Override
390         public int drain(Consumer<T> c) {
391             throw new UnsupportedOperationException();
392         }
393 
394         @Override
395         public int fill(Supplier<T> s) {
396             throw new UnsupportedOperationException();
397         }
398 
399         @Override
400         public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
401             throw new UnsupportedOperationException();
402         }
403 
404         @Override
405         public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
406             throw new UnsupportedOperationException();
407         }
408     }
409 }