View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.concurrent.FastThreadLocal;
19  import io.netty.util.concurrent.FastThreadLocalThread;
20  import io.netty.util.internal.ObjectPool;
21  import io.netty.util.internal.PlatformDependent;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  import org.jctools.queues.MessagePassingQueue;
27  import org.jetbrains.annotations.VisibleForTesting;
28  
29  import java.util.ArrayDeque;
30  import java.util.Objects;
31  import java.util.Queue;
32  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33  
34  import static io.netty.util.internal.PlatformDependent.newFixedMpmcQueue;
35  import static io.netty.util.internal.PlatformDependent.newMpscQueue;
36  import static java.lang.Math.max;
37  import static java.lang.Math.min;
38  
39  /**
40   * Light-weight object pool based on a thread-local stack.
41   *
42   * @param <T> the type of the pooled object
43   */
44  public abstract class Recycler<T> {
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
46  
47      /**
48       * We created this handle to avoid having more than 2 concrete implementations of {@link EnhancedHandle}
49       * i.e. NOOP_HANDLE, {@link DefaultHandle} and the one used in the LocalPool.
50       */
51      private static final class LocalPoolHandle<T> extends EnhancedHandle<T> {
52          private final UnguardedLocalPool<T> pool;
53  
54          private LocalPoolHandle(UnguardedLocalPool<T> pool) {
55              this.pool = pool;
56          }
57  
58          @Override
59          public void recycle(T object) {
60              UnguardedLocalPool<T> pool = this.pool;
61              if (pool != null) {
62                  pool.release(object);
63              }
64          }
65  
66          @Override
67          public void unguardedRecycle(final Object object) {
68              UnguardedLocalPool<T> pool = this.pool;
69              if (pool != null) {
70                  pool.release((T) object);
71              }
72          }
73      }
74  
75      private static final EnhancedHandle<?> NOOP_HANDLE = new LocalPoolHandle<>(null);
76      private static final UnguardedLocalPool<?> NOOP_LOCAL_POOL = new UnguardedLocalPool<>(0);
77      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
78      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
79      private static final int RATIO;
80      private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
81      private static final boolean BLOCKING_POOL;
82      private static final boolean BATCH_FAST_TL_ONLY;
83  
84      static {
85          // In the future, we might have different maxCapacity for different object types.
86          // e.g. io.netty.recycler.maxCapacity.writeTask
87          //      io.netty.recycler.maxCapacity.outboundBuffer
88          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
89                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
90          if (maxCapacityPerThread < 0) {
91              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
92          }
93  
94          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
95          DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
96  
97          // By default, we allow one push to a Recycler for each 8th try on handles that were never recycled before.
98          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
99          // bursts.
100         RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
101 
102         BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
103         BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
104 
105         if (logger.isDebugEnabled()) {
106             if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
107                 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
108                 logger.debug("-Dio.netty.recycler.ratio: disabled");
109                 logger.debug("-Dio.netty.recycler.chunkSize: disabled");
110                 logger.debug("-Dio.netty.recycler.blocking: disabled");
111                 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
112             } else {
113                 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
114                 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
115                 logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
116                 logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
117                 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
118             }
119         }
120     }
121 
122     private final LocalPool<?, T> localPool;
123     private final FastThreadLocal<LocalPool<?, T>> threadLocalPool;
124 
125     /**
126      * USE IT CAREFULLY!<br>
127      * This is creating a shareable {@link Recycler} which {@code get()} can be called concurrently from different
128      * {@link Thread}s.<br>
129      * Usually {@link Recycler}s uses some form of thread-local storage, but this constructor is disabling it
130      * and using a single pool of instances instead, sized as {@code maxCapacity}<br>
131      * This is NOT enforcing pooled instances states to be validated if {@code unguarded = true}:
132      * it means that {@link Handle#recycle(Object)} is not checking that {@code object} is the same which was
133      * recycled and assume no other recycling happens concurrently
134      * (similar to what {@link EnhancedHandle#unguardedRecycle(Object)} does).<br>
135      */
136     protected Recycler(int maxCapacity, boolean unguarded) {
137         if (maxCapacity <= 0) {
138             maxCapacity = 0;
139         } else {
140             maxCapacity = max(4, maxCapacity);
141         }
142         threadLocalPool = null;
143         if (maxCapacity == 0) {
144             localPool = (LocalPool<?, T>) NOOP_LOCAL_POOL;
145         } else {
146             localPool = unguarded? new UnguardedLocalPool<>(maxCapacity) : new GuardedLocalPool<>(maxCapacity);
147         }
148     }
149 
150     /**
151      * USE IT CAREFULLY!<br>
152      * This is NOT enforcing pooled instances states to be validated if {@code unguarded = true}:
153      * it means that {@link Handle#recycle(Object)} is not checking that {@code object} is the same which was
154      * recycled and assume no other recycling happens concurrently
155      * (similar to what {@link EnhancedHandle#unguardedRecycle(Object)} does).<br>
156      */
157     protected Recycler(boolean unguarded) {
158         this(DEFAULT_MAX_CAPACITY_PER_THREAD, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD, unguarded);
159     }
160 
161     /**
162      * USE IT CAREFULLY!<br>
163      * This is NOT enforcing pooled instances states to be validated if {@code unguarded = true} as stated by
164      * {@link #Recycler(boolean)} and allows to pin the recycler to a specific {@link Thread}, if {@code owner}
165      * is not {@code null}.
166      * <p>
167      * Since this method has been introduced for performance-sensitive cases it doesn't validate if {@link #get()} is
168      * called from the {@code owner} {@link Thread}: it assumes {@link #get()} to never happen concurrently.
169      * <p>
170      */
171     protected Recycler(Thread owner, boolean unguarded) {
172         this(DEFAULT_MAX_CAPACITY_PER_THREAD, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD, owner, unguarded);
173     }
174 
175     protected Recycler(int maxCapacityPerThread) {
176         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
177     }
178 
179     protected Recycler() {
180         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
181     }
182 
183     /**
184      * USE IT CAREFULLY!<br>
185      * This is NOT enforcing pooled instances states to be validated if {@code unguarded = true} as stated by
186      * {@link #Recycler(boolean)}, but it allows to tune the chunk size used for local pooling.
187      */
188     protected Recycler(int chunksSize, int maxCapacityPerThread, boolean unguarded) {
189         this(maxCapacityPerThread, RATIO, chunksSize, unguarded);
190     }
191 
192     /**
193      * USE IT CAREFULLY!<br>
194      * This is NOT enforcing pooled instances states to be validated if {@code unguarded = true} and allows pinning
195      * the recycler to a specific {@link Thread}, as stated by {@link #Recycler(Thread, boolean)}.<br>
196      * It also allows tuning the chunk size used for local pooling and the max capacity per thread.
197      *
198      * @throws IllegalArgumentException if {@code owner} is {@code null}.
199      */
200     protected Recycler(int chunkSize, int maxCapacityPerThread, Thread owner, boolean unguarded) {
201         this(maxCapacityPerThread, RATIO, chunkSize, owner, unguarded);
202     }
203 
204     /**
205      * @deprecated Use one of the following instead:
206      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
207      */
208     @Deprecated
209     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
210     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
211         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
212     }
213 
214     /**
215      * @deprecated Use one of the following instead:
216      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
217      */
218     @Deprecated
219     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
220     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
221                        int ratio, int maxDelayedQueuesPerThread) {
222         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
223     }
224 
225     /**
226      * @deprecated Use one of the following instead:
227      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
228      */
229     @Deprecated
230     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
231     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
232                        int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
233         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
234     }
235 
236     protected Recycler(int maxCapacityPerThread, int interval, int chunkSize) {
237         this(maxCapacityPerThread, interval, chunkSize, true, null, false);
238     }
239 
240     /**
241      * USE IT CAREFULLY!<br>
242      * This is NOT enforcing pooled instances states to be validated if {@code unguarded =true}
243      * as stated by {@link #Recycler(boolean)}.
244      */
245     protected Recycler(int maxCapacityPerThread, int interval, int chunkSize, boolean unguarded) {
246         this(maxCapacityPerThread, interval, chunkSize, true, null, unguarded);
247     }
248 
249     /**
250      * USE IT CAREFULLY!<br>
251      * This is NOT enforcing pooled instances states to be validated if {@code unguarded =true}
252      * as stated by {@link #Recycler(boolean)}.
253      */
254     protected Recycler(int maxCapacityPerThread, int interval, int chunkSize, Thread owner, boolean unguarded) {
255         this(maxCapacityPerThread, interval, chunkSize, false, owner, unguarded);
256     }
257 
258     @SuppressWarnings("unchecked")
259     private Recycler(int maxCapacityPerThread, int ratio, int chunkSize, boolean useThreadLocalStorage,
260                      Thread owner, boolean unguarded) {
261         final int interval = max(0, ratio);
262         if (maxCapacityPerThread <= 0) {
263             maxCapacityPerThread = 0;
264             chunkSize = 0;
265         } else {
266             maxCapacityPerThread = max(4, maxCapacityPerThread);
267             chunkSize = max(2, min(chunkSize, maxCapacityPerThread >> 1));
268         }
269         if (maxCapacityPerThread > 0 && useThreadLocalStorage) {
270             final int finalMaxCapacityPerThread = maxCapacityPerThread;
271             final int finalChunkSize = chunkSize;
272             threadLocalPool = new FastThreadLocal<LocalPool<?, T>>() {
273                 @Override
274                 protected LocalPool<?, T> initialValue() {
275                     return unguarded? new UnguardedLocalPool<>(finalMaxCapacityPerThread, interval, finalChunkSize) :
276                             new GuardedLocalPool<>(finalMaxCapacityPerThread, interval, finalChunkSize);
277                 }
278 
279                 @Override
280                 protected void onRemoval(LocalPool<?, T> value) throws Exception {
281                     super.onRemoval(value);
282                     MessagePassingQueue<?> handles = value.pooledHandles;
283                     value.pooledHandles = null;
284                     value.owner = null;
285                     if (handles != null) {
286                         handles.clear();
287                     }
288                 }
289             };
290             localPool = null;
291         } else {
292             threadLocalPool = null;
293             if (maxCapacityPerThread == 0) {
294                 localPool = (LocalPool<?, T>) NOOP_LOCAL_POOL;
295             } else {
296                 Objects.requireNonNull(owner, "owner");
297                 localPool = unguarded? new UnguardedLocalPool<>(owner, maxCapacityPerThread, interval, chunkSize) :
298                         new GuardedLocalPool<>(owner, maxCapacityPerThread, interval, chunkSize);
299             }
300         }
301     }
302 
303     @SuppressWarnings("unchecked")
304     public final T get() {
305         if (localPool != null) {
306             return localPool.getWith(this);
307         } else {
308             if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals()) {
309                 return newObject((Handle<T>) NOOP_HANDLE);
310             }
311             return threadLocalPool.get().getWith(this);
312         }
313     }
314 
315     /**
316      * Disassociates the {@link Recycler} from the current {@link Thread} if it was pinned,
317      * see {@link #Recycler(Thread, boolean)}.
318      * <p>
319      * Be aware that this method is not thread-safe: it's necessary to allow a {@link Thread} to
320      * be garbage collected even if {@link Handle}s are still referenced by other objects.
321      * <p>
322      */
323     public static void unpinOwner(Recycler<?> recycler) {
324         if (recycler.localPool != null) {
325             recycler.localPool.owner = null;
326         }
327     }
328 
329     /**
330      * @deprecated use {@link Handle#recycle(Object)}.
331      */
332     @Deprecated
333     public final boolean recycle(T o, Handle<T> handle) {
334         if (handle == NOOP_HANDLE) {
335             return false;
336         }
337 
338         handle.recycle(o);
339         return true;
340     }
341 
342     @VisibleForTesting
343     final int threadLocalSize() {
344         if (localPool != null) {
345             return localPool.size();
346         } else {
347             if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals()) {
348                 return 0;
349             }
350             final LocalPool<?, T> pool = threadLocalPool.getIfExists();
351             if (pool == null) {
352                 return 0;
353             }
354             return pool.size();
355         }
356     }
357 
358     /**
359      * @param handle can NOT be null.
360      */
361     protected abstract T newObject(Handle<T> handle);
362 
363     @SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
364     public interface Handle<T> extends ObjectPool.Handle<T>  { }
365 
366     @UnstableApi
367     public abstract static class EnhancedHandle<T> implements Handle<T> {
368 
369         public abstract void unguardedRecycle(Object object);
370 
371         private EnhancedHandle() {
372         }
373     }
374 
375     private static final class DefaultHandle<T> extends EnhancedHandle<T> {
376         private static final int STATE_CLAIMED = 0;
377         private static final int STATE_AVAILABLE = 1;
378         private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
379         static {
380             AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
381             //noinspection unchecked
382             STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
383         }
384 
385         private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
386         private final GuardedLocalPool<T> localPool;
387         private T value;
388 
389         DefaultHandle(GuardedLocalPool<T> localPool) {
390             this.localPool = localPool;
391         }
392 
393         @Override
394         public void recycle(Object object) {
395             if (object != value) {
396                 throw new IllegalArgumentException("object does not belong to handle");
397             }
398             toAvailable();
399             localPool.release(this);
400         }
401 
402         @Override
403         public void unguardedRecycle(Object object) {
404             if (object != value) {
405                 throw new IllegalArgumentException("object does not belong to handle");
406             }
407             unguardedToAvailable();
408             localPool.release(this);
409         }
410 
411         T claim() {
412             assert state == STATE_AVAILABLE;
413             STATE_UPDATER.lazySet(this, STATE_CLAIMED);
414             return value;
415         }
416 
417         void set(T value) {
418             this.value = value;
419         }
420 
421         private void toAvailable() {
422             int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
423             if (prev == STATE_AVAILABLE) {
424                 throw new IllegalStateException("Object has been recycled already.");
425             }
426         }
427 
428         private void unguardedToAvailable() {
429             int prev = state;
430             if (prev == STATE_AVAILABLE) {
431                 throw new IllegalStateException("Object has been recycled already.");
432             }
433             STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
434         }
435     }
436 
437     private static final class GuardedLocalPool<T> extends LocalPool<DefaultHandle<T>, T> {
438 
439         GuardedLocalPool(int maxCapacity) {
440             super(maxCapacity);
441         }
442 
443         GuardedLocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
444             super(owner, maxCapacity, ratioInterval, chunkSize);
445         }
446 
447         GuardedLocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
448             super(maxCapacity, ratioInterval, chunkSize);
449         }
450 
451         @Override
452         public T getWith(Recycler<T> recycler) {
453             DefaultHandle<T> handle = acquire();
454             T obj;
455             if (handle == null) {
456                 handle = canAllocatePooled()? new DefaultHandle<>(this) : null;
457                 if (handle != null) {
458                     obj = recycler.newObject(handle);
459                     handle.set(obj);
460                 } else {
461                     obj = recycler.newObject((Handle<T>) NOOP_HANDLE);
462                 }
463             } else {
464                 obj = handle.claim();
465             }
466             return obj;
467         }
468     }
469 
470     private static final class UnguardedLocalPool<T> extends LocalPool<T, T> {
471         private final EnhancedHandle<T> handle;
472 
473         UnguardedLocalPool(int maxCapacity) {
474             super(maxCapacity);
475             handle = maxCapacity == 0? null : new LocalPoolHandle<>(this);
476         }
477 
478         UnguardedLocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
479             super(owner, maxCapacity, ratioInterval, chunkSize);
480             handle = new LocalPoolHandle<>(this);
481         }
482 
483         UnguardedLocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
484             super(maxCapacity, ratioInterval, chunkSize);
485             handle = new LocalPoolHandle<>(this);
486         }
487 
488         @Override
489         public T getWith(Recycler<T> recycler) {
490             T obj = acquire();
491             if (obj == null) {
492                 obj = recycler.newObject(canAllocatePooled()? handle : (Handle<T>) NOOP_HANDLE);
493             }
494             return obj;
495         }
496     }
497 
498     private abstract static class LocalPool<H, T> {
499         private final int ratioInterval;
500         private final H[] batch;
501         private int batchSize;
502         private Thread owner;
503         private MessagePassingQueue<H> pooledHandles;
504         private int ratioCounter;
505 
506         LocalPool(int maxCapacity) {
507             // if there's no capacity, we need to never allocate pooled objects.
508             // if there's capacity, because there is a shared pool, we always pool them, since we cannot trust the
509             // thread unsafe ratio counter.
510             this.ratioInterval = maxCapacity == 0? -1 : 0;
511             this.owner = null;
512             batch = null;
513             batchSize = 0;
514             pooledHandles = createExternalMcPool(maxCapacity);
515             ratioCounter = 0;
516         }
517 
518         @SuppressWarnings("unchecked")
519         LocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
520             this.ratioInterval = ratioInterval;
521             this.owner = owner;
522             batch = owner != null? (H[]) new Object[chunkSize] : null;
523             batchSize = 0;
524             pooledHandles = createExternalScPool(chunkSize, maxCapacity);
525             ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
526         }
527 
528         private static <H> MessagePassingQueue<H> createExternalMcPool(int maxCapacity) {
529             if (maxCapacity == 0) {
530                 return null;
531             }
532             if (BLOCKING_POOL) {
533                 return new BlockingMessageQueue<>(maxCapacity);
534             }
535             return (MessagePassingQueue<H>) newFixedMpmcQueue(maxCapacity);
536         }
537 
538         private static <H> MessagePassingQueue<H> createExternalScPool(int chunkSize, int maxCapacity) {
539             if (maxCapacity == 0) {
540                 return null;
541             }
542             if (BLOCKING_POOL) {
543                 return new BlockingMessageQueue<>(maxCapacity);
544             }
545             return (MessagePassingQueue<H>) newMpscQueue(chunkSize, maxCapacity);
546         }
547 
548         LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
549             this(!BATCH_FAST_TL_ONLY || FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals()
550                          ? Thread.currentThread() : null, maxCapacity, ratioInterval, chunkSize);
551         }
552 
553         protected final H acquire() {
554             int size = batchSize;
555             if (size == 0) {
556                 // it's ok to be racy; at worst we reuse something that won't return back to the pool
557                 final MessagePassingQueue<H> handles = pooledHandles;
558                 if (handles == null) {
559                     return null;
560                 }
561                 return handles.relaxedPoll();
562             }
563             int top = size - 1;
564             final H h = batch[top];
565             batchSize = top;
566             batch[top] = null;
567             return h;
568         }
569 
570         protected final void release(H handle) {
571             Thread owner = this.owner;
572             if (owner != null && Thread.currentThread() == owner && batchSize < batch.length) {
573                 batch[batchSize] = handle;
574                 batchSize++;
575             } else if (owner != null && isTerminated(owner)) {
576                 pooledHandles = null;
577                 this.owner = null;
578             } else {
579                 MessagePassingQueue<H> handles = pooledHandles;
580                 if (handles != null) {
581                     handles.relaxedOffer(handle);
582                 }
583             }
584         }
585 
586         private static boolean isTerminated(Thread owner) {
587             // Do not use `Thread.getState()` in J9 JVM because it's known to have a performance issue.
588             // See: https://github.com/netty/netty/issues/13347#issuecomment-1518537895
589             return PlatformDependent.isJ9Jvm()? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
590         }
591 
592         boolean canAllocatePooled() {
593             if (ratioInterval < 0) {
594                 return false;
595             }
596             if (ratioInterval == 0) {
597                 return true;
598             }
599             if (++ratioCounter >= ratioInterval) {
600                 ratioCounter = 0;
601                 return true;
602             }
603             return false;
604         }
605 
606         abstract T getWith(Recycler<T> recycler);
607 
608         int size() {
609             MessagePassingQueue<H> handles = pooledHandles;
610             final int externalSize = handles != null? handles.size() : 0;
611             return externalSize + (batch != null? batchSize : 0);
612         }
613     }
614 
615     /**
616      * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
617      * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
618      * The implementation relies on synchronised monitor locks for thread-safety.
619      * The {@code fill} bulk operation is not supported by this implementation.
620      */
621     private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
622         private final Queue<T> deque;
623         private final int maxCapacity;
624 
625         BlockingMessageQueue(int maxCapacity) {
626             this.maxCapacity = maxCapacity;
627             // This message passing queue is backed by an ArrayDeque instance,
628             // made thread-safe by synchronising on `this` BlockingMessageQueue instance.
629             // Why ArrayDeque?
630             // We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient.
631             // We use ArrayDeque instead of ArrayList because we need the queue APIs.
632             // We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size().
633             // We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front,
634             // and these queues will usually have large capacities, in potentially great numbers (one per thread),
635             // but often only have comparatively few items in them.
636             deque = new ArrayDeque<T>();
637         }
638 
639         @Override
640         public synchronized boolean offer(T e) {
641             if (deque.size() == maxCapacity) {
642                 return false;
643             }
644             return deque.offer(e);
645         }
646 
647         @Override
648         public synchronized T poll() {
649             return deque.poll();
650         }
651 
652         @Override
653         public synchronized T peek() {
654             return deque.peek();
655         }
656 
657         @Override
658         public synchronized int size() {
659             return deque.size();
660         }
661 
662         @Override
663         public synchronized void clear() {
664             deque.clear();
665         }
666 
667         @Override
668         public synchronized boolean isEmpty() {
669             return deque.isEmpty();
670         }
671 
672         @Override
673         public int capacity() {
674             return maxCapacity;
675         }
676 
677         @Override
678         public boolean relaxedOffer(T e) {
679             return offer(e);
680         }
681 
682         @Override
683         public T relaxedPoll() {
684             return poll();
685         }
686 
687         @Override
688         public T relaxedPeek() {
689             return peek();
690         }
691 
692         @Override
693         public int drain(Consumer<T> c, int limit) {
694             T obj;
695             int i = 0;
696             for (; i < limit && (obj = poll()) != null; i++) {
697                 c.accept(obj);
698             }
699             return i;
700         }
701 
702         @Override
703         public int fill(Supplier<T> s, int limit) {
704             throw new UnsupportedOperationException();
705         }
706 
707         @Override
708         public int drain(Consumer<T> c) {
709             throw new UnsupportedOperationException();
710         }
711 
712         @Override
713         public int fill(Supplier<T> s) {
714             throw new UnsupportedOperationException();
715         }
716 
717         @Override
718         public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
719             throw new UnsupportedOperationException();
720         }
721 
722         @Override
723         public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
724             throw new UnsupportedOperationException();
725         }
726     }
727 }