View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.lang.ref.WeakReference;
25  import java.util.Arrays;
26  import java.util.Map;
27  import java.util.WeakHashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
31  import static java.lang.Math.max;
32  import static java.lang.Math.min;
33  
34  /**
35   * Light-weight object pool based on a thread-local stack.
36   *
37   * @param <T> the type of the pooled object
38   */
39  public abstract class Recycler<T> {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
42  
43      @SuppressWarnings("rawtypes")
44      private static final Handle NOOP_HANDLE = new Handle() {
45          @Override
46          public void recycle(Object object) {
47              // NOOP
48          }
49      };
50      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
51      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
52      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
53      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
54      private static final int INITIAL_CAPACITY;
55      private static final int MAX_SHARED_CAPACITY_FACTOR;
56      private static final int MAX_DELAYED_QUEUES_PER_THREAD;
57      private static final int LINK_CAPACITY;
58      private static final int RATIO;
59  
60      static {
61          // In the future, we might have different maxCapacity for different object types.
62          // e.g. io.netty.recycler.maxCapacity.writeTask
63          //      io.netty.recycler.maxCapacity.outboundBuffer
64          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
65                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
66          if (maxCapacityPerThread < 0) {
67              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
68          }
69  
70          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
71  
72          MAX_SHARED_CAPACITY_FACTOR = max(2,
73                  SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
74                          2));
75  
76          MAX_DELAYED_QUEUES_PER_THREAD = max(0,
77                  SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
78                          // We use the same value as default EventLoop number
79                          NettyRuntime.availableProcessors() * 2));
80  
81          LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
82                  max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
83  
84          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
85          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
86          // bursts.
87          RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
88  
89          if (logger.isDebugEnabled()) {
90              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
91                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
92                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
93                  logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
94                  logger.debug("-Dio.netty.recycler.ratio: disabled");
95              } else {
96                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
98                  logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
99                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
100             }
101         }
102 
103         INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
104     }
105 
106     private final int maxCapacityPerThread;
107     private final int maxSharedCapacityFactor;
108     private final int ratioMask;
109     private final int maxDelayedQueuesPerThread;
110 
111     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
112         @Override
113         protected Stack<T> initialValue() {
114             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
115                     ratioMask, maxDelayedQueuesPerThread);
116         }
117 
118         @Override
119         protected void onRemoval(Stack<T> value) {
120             // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
121             if (value.threadRef.get() == Thread.currentThread()) {
122                if (DELAYED_RECYCLED.isSet()) {
123                    DELAYED_RECYCLED.get().remove(value);
124                }
125             }
126         }
127     };
128 
129     protected Recycler() {
130         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
131     }
132 
133     protected Recycler(int maxCapacityPerThread) {
134         this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
135     }
136 
137     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
138         this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
139     }
140 
141     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
142                        int ratio, int maxDelayedQueuesPerThread) {
143         ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
144         if (maxCapacityPerThread <= 0) {
145             this.maxCapacityPerThread = 0;
146             this.maxSharedCapacityFactor = 1;
147             this.maxDelayedQueuesPerThread = 0;
148         } else {
149             this.maxCapacityPerThread = maxCapacityPerThread;
150             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
151             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
152         }
153     }
154 
155     @SuppressWarnings("unchecked")
156     public final T get() {
157         if (maxCapacityPerThread == 0) {
158             return newObject((Handle<T>) NOOP_HANDLE);
159         }
160         Stack<T> stack = threadLocal.get();
161         DefaultHandle<T> handle = stack.pop();
162         if (handle == null) {
163             handle = stack.newHandle();
164             handle.value = newObject(handle);
165         }
166         return (T) handle.value;
167     }
168 
169     /**
170      * @deprecated use {@link Handle#recycle(Object)}.
171      */
172     @Deprecated
173     public final boolean recycle(T o, Handle<T> handle) {
174         if (handle == NOOP_HANDLE) {
175             return false;
176         }
177 
178         DefaultHandle<T> h = (DefaultHandle<T>) handle;
179         if (h.stack.parent != this) {
180             return false;
181         }
182 
183         h.recycle(o);
184         return true;
185     }
186 
187     final int threadLocalCapacity() {
188         return threadLocal.get().elements.length;
189     }
190 
191     final int threadLocalSize() {
192         return threadLocal.get().size;
193     }
194 
195     protected abstract T newObject(Handle<T> handle);
196 
197     public interface Handle<T> {
198         void recycle(T object);
199     }
200 
201     static final class DefaultHandle<T> implements Handle<T> {
202         private int lastRecycledId;
203         private int recycleId;
204 
205         boolean hasBeenRecycled;
206 
207         private Stack<?> stack;
208         private Object value;
209 
210         DefaultHandle(Stack<?> stack) {
211             this.stack = stack;
212         }
213 
214         @Override
215         public void recycle(Object object) {
216             if (object != value) {
217                 throw new IllegalArgumentException("object does not belong to handle");
218             }
219 
220             Stack<?> stack = this.stack;
221             if (lastRecycledId != recycleId || stack == null) {
222                 throw new IllegalStateException("recycled already");
223             }
224 
225             stack.push(this);
226         }
227     }
228 
229     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
230             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
231         @Override
232         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
233             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
234         }
235     };
236 
237     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
238     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
239     private static final class WeakOrderQueue {
240 
241         static final WeakOrderQueue DUMMY = new WeakOrderQueue();
242 
243         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
244         @SuppressWarnings("serial")
245         static final class Link extends AtomicInteger {
246             private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
247 
248             private int readIndex;
249             Link next;
250         }
251 
252         // This act as a place holder for the head Link but also will reclaim space once finalized.
253         // Its important this does not hold any reference to either Stack or WeakOrderQueue.
254         static final class Head {
255             private final AtomicInteger availableSharedCapacity;
256 
257             Link link;
258 
259             Head(AtomicInteger availableSharedCapacity) {
260                 this.availableSharedCapacity = availableSharedCapacity;
261             }
262 
263             /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
264             @Override
265             protected void finalize() throws Throwable {
266                 try {
267                     super.finalize();
268                 } finally {
269                     Link head = link;
270                     link = null;
271                     while (head != null) {
272                         reclaimSpace(LINK_CAPACITY);
273                         Link next = head.next;
274                         // Unlink to help GC and guard against GC nepotism.
275                         head.next = null;
276                         head = next;
277                     }
278                 }
279             }
280 
281             void reclaimSpace(int space) {
282                 assert space >= 0;
283                 availableSharedCapacity.addAndGet(space);
284             }
285 
286             boolean reserveSpace(int space) {
287                 return reserveSpace(availableSharedCapacity, space);
288             }
289 
290             static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
291                 assert space >= 0;
292                 for (;;) {
293                     int available = availableSharedCapacity.get();
294                     if (available < space) {
295                         return false;
296                     }
297                     if (availableSharedCapacity.compareAndSet(available, available - space)) {
298                         return true;
299                     }
300                 }
301             }
302         }
303 
304         // chain of data items
305         private final Head head;
306         private Link tail;
307         // pointer to another queue of delayed items for the same stack
308         private WeakOrderQueue next;
309         private final WeakReference<Thread> owner;
310         private final int id = ID_GENERATOR.getAndIncrement();
311 
312         private WeakOrderQueue() {
313             owner = null;
314             head = new Head(null);
315         }
316 
317         private WeakOrderQueue(Stack<?> stack, Thread thread) {
318             tail = new Link();
319 
320             // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
321             // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
322             // Stack itself GCed.
323             head = new Head(stack.availableSharedCapacity);
324             head.link = tail;
325             owner = new WeakReference<Thread>(thread);
326         }
327 
328         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
329             final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
330             // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
331             // may be accessed while its still constructed.
332             stack.setHead(queue);
333 
334             return queue;
335         }
336 
337         private void setNext(WeakOrderQueue next) {
338             assert next != this;
339             this.next = next;
340         }
341 
342         /**
343          * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
344          */
345         static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
346             // We allocated a Link so reserve the space
347             return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
348                     ? newQueue(stack, thread) : null;
349         }
350 
351         void add(DefaultHandle<?> handle) {
352             handle.lastRecycledId = id;
353 
354             Link tail = this.tail;
355             int writeIndex;
356             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
357                 if (!head.reserveSpace(LINK_CAPACITY)) {
358                     // Drop it.
359                     return;
360                 }
361                 // We allocate a Link so reserve the space
362                 this.tail = tail = tail.next = new Link();
363 
364                 writeIndex = tail.get();
365             }
366             tail.elements[writeIndex] = handle;
367             handle.stack = null;
368             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
369             // this also means we guarantee visibility of an element in the queue if we see the index updated
370             tail.lazySet(writeIndex + 1);
371         }
372 
373         boolean hasFinalData() {
374             return tail.readIndex != tail.get();
375         }
376 
377         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
378         @SuppressWarnings("rawtypes")
379         boolean transfer(Stack<?> dst) {
380             Link head = this.head.link;
381             if (head == null) {
382                 return false;
383             }
384 
385             if (head.readIndex == LINK_CAPACITY) {
386                 if (head.next == null) {
387                     return false;
388                 }
389                 this.head.link = head = head.next;
390             }
391 
392             final int srcStart = head.readIndex;
393             int srcEnd = head.get();
394             final int srcSize = srcEnd - srcStart;
395             if (srcSize == 0) {
396                 return false;
397             }
398 
399             final int dstSize = dst.size;
400             final int expectedCapacity = dstSize + srcSize;
401 
402             if (expectedCapacity > dst.elements.length) {
403                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
404                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
405             }
406 
407             if (srcStart != srcEnd) {
408                 final DefaultHandle[] srcElems = head.elements;
409                 final DefaultHandle[] dstElems = dst.elements;
410                 int newDstSize = dstSize;
411                 for (int i = srcStart; i < srcEnd; i++) {
412                     DefaultHandle element = srcElems[i];
413                     if (element.recycleId == 0) {
414                         element.recycleId = element.lastRecycledId;
415                     } else if (element.recycleId != element.lastRecycledId) {
416                         throw new IllegalStateException("recycled already");
417                     }
418                     srcElems[i] = null;
419 
420                     if (dst.dropHandle(element)) {
421                         // Drop the object.
422                         continue;
423                     }
424                     element.stack = dst;
425                     dstElems[newDstSize ++] = element;
426                 }
427 
428                 if (srcEnd == LINK_CAPACITY && head.next != null) {
429                     // Add capacity back as the Link is GCed.
430                     this.head.reclaimSpace(LINK_CAPACITY);
431                     this.head.link = head.next;
432                 }
433 
434                 head.readIndex = srcEnd;
435                 if (dst.size == newDstSize) {
436                     return false;
437                 }
438                 dst.size = newDstSize;
439                 return true;
440             } else {
441                 // The destination stack is full already.
442                 return false;
443             }
444         }
445     }
446 
447     static final class Stack<T> {
448 
449         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
450         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
451         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
452         // still recycling all items.
453         final Recycler<T> parent;
454 
455         // We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
456         // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack.
457         //
458         // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if
459         // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear
460         // it in a timely manner).
461         final WeakReference<Thread> threadRef;
462         final AtomicInteger availableSharedCapacity;
463         final int maxDelayedQueues;
464 
465         private final int maxCapacity;
466         private final int ratioMask;
467         private DefaultHandle<?>[] elements;
468         private int size;
469         private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
470         private WeakOrderQueue cursor, prev;
471         private volatile WeakOrderQueue head;
472 
473         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
474               int ratioMask, int maxDelayedQueues) {
475             this.parent = parent;
476             threadRef = new WeakReference<Thread>(thread);
477             this.maxCapacity = maxCapacity;
478             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
479             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
480             this.ratioMask = ratioMask;
481             this.maxDelayedQueues = maxDelayedQueues;
482         }
483 
484         // Marked as synchronized to ensure this is serialized.
485         synchronized void setHead(WeakOrderQueue queue) {
486             queue.setNext(head);
487             head = queue;
488         }
489 
490         int increaseCapacity(int expectedCapacity) {
491             int newCapacity = elements.length;
492             int maxCapacity = this.maxCapacity;
493             do {
494                 newCapacity <<= 1;
495             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
496 
497             newCapacity = min(newCapacity, maxCapacity);
498             if (newCapacity != elements.length) {
499                 elements = Arrays.copyOf(elements, newCapacity);
500             }
501 
502             return newCapacity;
503         }
504 
505         @SuppressWarnings({ "unchecked", "rawtypes" })
506         DefaultHandle<T> pop() {
507             int size = this.size;
508             if (size == 0) {
509                 if (!scavenge()) {
510                     return null;
511                 }
512                 size = this.size;
513             }
514             size --;
515             DefaultHandle ret = elements[size];
516             elements[size] = null;
517             if (ret.lastRecycledId != ret.recycleId) {
518                 throw new IllegalStateException("recycled multiple times");
519             }
520             ret.recycleId = 0;
521             ret.lastRecycledId = 0;
522             this.size = size;
523             return ret;
524         }
525 
526         boolean scavenge() {
527             // continue an existing scavenge, if any
528             if (scavengeSome()) {
529                 return true;
530             }
531 
532             // reset our scavenge cursor
533             prev = null;
534             cursor = head;
535             return false;
536         }
537 
538         boolean scavengeSome() {
539             WeakOrderQueue prev;
540             WeakOrderQueue cursor = this.cursor;
541             if (cursor == null) {
542                 prev = null;
543                 cursor = head;
544                 if (cursor == null) {
545                     return false;
546                 }
547             } else {
548                 prev = this.prev;
549             }
550 
551             boolean success = false;
552             do {
553                 if (cursor.transfer(this)) {
554                     success = true;
555                     break;
556                 }
557                 WeakOrderQueue next = cursor.next;
558                 if (cursor.owner.get() == null) {
559                     // If the thread associated with the queue is gone, unlink it, after
560                     // performing a volatile read to confirm there is no data left to collect.
561                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
562                     if (cursor.hasFinalData()) {
563                         for (;;) {
564                             if (cursor.transfer(this)) {
565                                 success = true;
566                             } else {
567                                 break;
568                             }
569                         }
570                     }
571 
572                     if (prev != null) {
573                         prev.setNext(next);
574                     }
575                 } else {
576                     prev = cursor;
577                 }
578 
579                 cursor = next;
580 
581             } while (cursor != null && !success);
582 
583             this.prev = prev;
584             this.cursor = cursor;
585             return success;
586         }
587 
588         void push(DefaultHandle<?> item) {
589             Thread currentThread = Thread.currentThread();
590             if (threadRef.get() == currentThread) {
591                 // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
592                 pushNow(item);
593             } else {
594                 // The current Thread is not the one that belongs to the Stack
595                 // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
596                 // happens later.
597                 pushLater(item, currentThread);
598             }
599         }
600 
601         private void pushNow(DefaultHandle<?> item) {
602             if ((item.recycleId | item.lastRecycledId) != 0) {
603                 throw new IllegalStateException("recycled already");
604             }
605             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
606 
607             int size = this.size;
608             if (size >= maxCapacity || dropHandle(item)) {
609                 // Hit the maximum capacity or should drop - drop the possibly youngest object.
610                 return;
611             }
612             if (size == elements.length) {
613                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
614             }
615 
616             elements[size] = item;
617             this.size = size + 1;
618         }
619 
620         private void pushLater(DefaultHandle<?> item, Thread thread) {
621             // we don't want to have a ref to the queue as the value in our weak map
622             // so we null it out; to ensure there are no races with restoring it later
623             // we impose a memory ordering here (no-op on x86)
624             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
625             WeakOrderQueue queue = delayedRecycled.get(this);
626             if (queue == null) {
627                 if (delayedRecycled.size() >= maxDelayedQueues) {
628                     // Add a dummy queue so we know we should drop the object
629                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
630                     return;
631                 }
632                 // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
633                 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
634                     // drop object
635                     return;
636                 }
637                 delayedRecycled.put(this, queue);
638             } else if (queue == WeakOrderQueue.DUMMY) {
639                 // drop object
640                 return;
641             }
642 
643             queue.add(item);
644         }
645 
646         boolean dropHandle(DefaultHandle<?> handle) {
647             if (!handle.hasBeenRecycled) {
648                 if ((++handleRecycleCount & ratioMask) != 0) {
649                     // Drop the object.
650                     return true;
651                 }
652                 handle.hasBeenRecycled = true;
653             }
654             return false;
655         }
656 
657         DefaultHandle<T> newHandle() {
658             return new DefaultHandle<T>(this);
659         }
660     }
661 }