View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.lang.ref.WeakReference;
25  import java.util.Arrays;
26  import java.util.Map;
27  import java.util.WeakHashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
31  import static java.lang.Math.max;
32  import static java.lang.Math.min;
33  
34  /**
35   * Light-weight object pool based on a thread-local stack.
36   *
37   * @param <T> the type of the pooled object
38   */
39  public abstract class Recycler<T> {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
42  
43      @SuppressWarnings("rawtypes")
44      private static final Handle NOOP_HANDLE = new Handle() {
45          @Override
46          public void recycle(Object object) {
47              // NOOP
48          }
49      };
50      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
51      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
52      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
53      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
54      private static final int INITIAL_CAPACITY;
55      private static final int MAX_SHARED_CAPACITY_FACTOR;
56      private static final int MAX_DELAYED_QUEUES_PER_THREAD;
57      private static final int LINK_CAPACITY;
58      private static final int RATIO;
59  
60      static {
61          // In the future, we might have different maxCapacity for different object types.
62          // e.g. io.netty.recycler.maxCapacity.writeTask
63          //      io.netty.recycler.maxCapacity.outboundBuffer
64          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
65                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
66          if (maxCapacityPerThread < 0) {
67              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
68          }
69  
70          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
71  
72          MAX_SHARED_CAPACITY_FACTOR = max(2,
73                  SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
74                          2));
75  
76          MAX_DELAYED_QUEUES_PER_THREAD = max(0,
77                  SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
78                          // We use the same value as default EventLoop number
79                          NettyRuntime.availableProcessors() * 2));
80  
81          LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
82                  max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
83  
84          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
85          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
86          // bursts.
87          RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
88  
89          if (logger.isDebugEnabled()) {
90              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
91                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
92                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
93                  logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
94                  logger.debug("-Dio.netty.recycler.ratio: disabled");
95              } else {
96                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
98                  logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
99                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
100             }
101         }
102 
103         INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
104     }
105 
106     private final int maxCapacityPerThread;
107     private final int maxSharedCapacityFactor;
108     private final int ratioMask;
109     private final int maxDelayedQueuesPerThread;
110 
111     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
112         @Override
113         protected Stack<T> initialValue() {
114             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
115                     ratioMask, maxDelayedQueuesPerThread);
116         }
117 
118         @Override
119         protected void onRemoval(Stack<T> value) {
120             // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
121             if (value.threadRef.get() == Thread.currentThread()) {
122                if (DELAYED_RECYCLED.isSet()) {
123                    DELAYED_RECYCLED.get().remove(value);
124                }
125             }
126         }
127     };
128 
129     protected Recycler() {
130         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
131     }
132 
133     protected Recycler(int maxCapacityPerThread) {
134         this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
135     }
136 
137     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
138         this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
139     }
140 
141     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
142                        int ratio, int maxDelayedQueuesPerThread) {
143         ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
144         if (maxCapacityPerThread <= 0) {
145             this.maxCapacityPerThread = 0;
146             this.maxSharedCapacityFactor = 1;
147             this.maxDelayedQueuesPerThread = 0;
148         } else {
149             this.maxCapacityPerThread = maxCapacityPerThread;
150             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
151             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
152         }
153     }
154 
155     @SuppressWarnings("unchecked")
156     public final T get() {
157         if (maxCapacityPerThread == 0) {
158             return newObject((Handle<T>) NOOP_HANDLE);
159         }
160         Stack<T> stack = threadLocal.get();
161         DefaultHandle<T> handle = stack.pop();
162         if (handle == null) {
163             handle = stack.newHandle();
164             handle.value = newObject(handle);
165         }
166         return (T) handle.value;
167     }
168 
169     /**
170      * @deprecated use {@link Handle#recycle(Object)}.
171      */
172     @Deprecated
173     public final boolean recycle(T o, Handle<T> handle) {
174         if (handle == NOOP_HANDLE) {
175             return false;
176         }
177 
178         DefaultHandle<T> h = (DefaultHandle<T>) handle;
179         if (h.stack.parent != this) {
180             return false;
181         }
182 
183         h.recycle(o);
184         return true;
185     }
186 
187     final int threadLocalCapacity() {
188         return threadLocal.get().elements.length;
189     }
190 
191     final int threadLocalSize() {
192         return threadLocal.get().size;
193     }
194 
195     protected abstract T newObject(Handle<T> handle);
196 
197     public interface Handle<T> {
198         void recycle(T object);
199     }
200 
201     static final class DefaultHandle<T> implements Handle<T> {
202         private int lastRecycledId;
203         private int recycleId;
204 
205         boolean hasBeenRecycled;
206 
207         private Stack<?> stack;
208         private Object value;
209 
210         DefaultHandle(Stack<?> stack) {
211             this.stack = stack;
212         }
213 
214         @Override
215         public void recycle(Object object) {
216             if (object != value) {
217                 throw new IllegalArgumentException("object does not belong to handle");
218             }
219             stack.push(this);
220         }
221     }
222 
223     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
224             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
225         @Override
226         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
227             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
228         }
229     };
230 
231     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
232     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
233     private static final class WeakOrderQueue {
234 
235         static final WeakOrderQueue DUMMY = new WeakOrderQueue();
236 
237         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
238         @SuppressWarnings("serial")
239         static final class Link extends AtomicInteger {
240             private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
241 
242             private int readIndex;
243             Link next;
244         }
245 
246         // This act as a place holder for the head Link but also will reclaim space once finalized.
247         // Its important this does not hold any reference to either Stack or WeakOrderQueue.
248         static final class Head {
249             private final AtomicInteger availableSharedCapacity;
250 
251             Link link;
252 
253             Head(AtomicInteger availableSharedCapacity) {
254                 this.availableSharedCapacity = availableSharedCapacity;
255             }
256 
257             /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
258             @Override
259             protected void finalize() throws Throwable {
260                 try {
261                     super.finalize();
262                 } finally {
263                     Link head = link;
264                     link = null;
265                     while (head != null) {
266                         reclaimSpace(LINK_CAPACITY);
267                         Link next = head.next;
268                         // Unlink to help GC and guard against GC nepotism.
269                         head.next = null;
270                         head = next;
271                     }
272                 }
273             }
274 
275             void reclaimSpace(int space) {
276                 assert space >= 0;
277                 availableSharedCapacity.addAndGet(space);
278             }
279 
280             boolean reserveSpace(int space) {
281                 return reserveSpace(availableSharedCapacity, space);
282             }
283 
284             static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
285                 assert space >= 0;
286                 for (;;) {
287                     int available = availableSharedCapacity.get();
288                     if (available < space) {
289                         return false;
290                     }
291                     if (availableSharedCapacity.compareAndSet(available, available - space)) {
292                         return true;
293                     }
294                 }
295             }
296         }
297 
298         // chain of data items
299         private final Head head;
300         private Link tail;
301         // pointer to another queue of delayed items for the same stack
302         private WeakOrderQueue next;
303         private final WeakReference<Thread> owner;
304         private final int id = ID_GENERATOR.getAndIncrement();
305 
306         private WeakOrderQueue() {
307             owner = null;
308             head = new Head(null);
309         }
310 
311         private WeakOrderQueue(Stack<?> stack, Thread thread) {
312             tail = new Link();
313 
314             // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
315             // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
316             // Stack itself GCed.
317             head = new Head(stack.availableSharedCapacity);
318             head.link = tail;
319             owner = new WeakReference<Thread>(thread);
320         }
321 
322         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
323             final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
324             // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
325             // may be accessed while its still constructed.
326             stack.setHead(queue);
327 
328             return queue;
329         }
330 
331         private void setNext(WeakOrderQueue next) {
332             assert next != this;
333             this.next = next;
334         }
335 
336         /**
337          * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
338          */
339         static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
340             // We allocated a Link so reserve the space
341             return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
342                     ? newQueue(stack, thread) : null;
343         }
344 
345         void add(DefaultHandle<?> handle) {
346             handle.lastRecycledId = id;
347 
348             Link tail = this.tail;
349             int writeIndex;
350             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
351                 if (!head.reserveSpace(LINK_CAPACITY)) {
352                     // Drop it.
353                     return;
354                 }
355                 // We allocate a Link so reserve the space
356                 this.tail = tail = tail.next = new Link();
357 
358                 writeIndex = tail.get();
359             }
360             tail.elements[writeIndex] = handle;
361             handle.stack = null;
362             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
363             // this also means we guarantee visibility of an element in the queue if we see the index updated
364             tail.lazySet(writeIndex + 1);
365         }
366 
367         boolean hasFinalData() {
368             return tail.readIndex != tail.get();
369         }
370 
371         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
372         @SuppressWarnings("rawtypes")
373         boolean transfer(Stack<?> dst) {
374             Link head = this.head.link;
375             if (head == null) {
376                 return false;
377             }
378 
379             if (head.readIndex == LINK_CAPACITY) {
380                 if (head.next == null) {
381                     return false;
382                 }
383                 this.head.link = head = head.next;
384             }
385 
386             final int srcStart = head.readIndex;
387             int srcEnd = head.get();
388             final int srcSize = srcEnd - srcStart;
389             if (srcSize == 0) {
390                 return false;
391             }
392 
393             final int dstSize = dst.size;
394             final int expectedCapacity = dstSize + srcSize;
395 
396             if (expectedCapacity > dst.elements.length) {
397                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
398                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
399             }
400 
401             if (srcStart != srcEnd) {
402                 final DefaultHandle[] srcElems = head.elements;
403                 final DefaultHandle[] dstElems = dst.elements;
404                 int newDstSize = dstSize;
405                 for (int i = srcStart; i < srcEnd; i++) {
406                     DefaultHandle element = srcElems[i];
407                     if (element.recycleId == 0) {
408                         element.recycleId = element.lastRecycledId;
409                     } else if (element.recycleId != element.lastRecycledId) {
410                         throw new IllegalStateException("recycled already");
411                     }
412                     srcElems[i] = null;
413 
414                     if (dst.dropHandle(element)) {
415                         // Drop the object.
416                         continue;
417                     }
418                     element.stack = dst;
419                     dstElems[newDstSize ++] = element;
420                 }
421 
422                 if (srcEnd == LINK_CAPACITY && head.next != null) {
423                     // Add capacity back as the Link is GCed.
424                     this.head.reclaimSpace(LINK_CAPACITY);
425                     this.head.link = head.next;
426                 }
427 
428                 head.readIndex = srcEnd;
429                 if (dst.size == newDstSize) {
430                     return false;
431                 }
432                 dst.size = newDstSize;
433                 return true;
434             } else {
435                 // The destination stack is full already.
436                 return false;
437             }
438         }
439     }
440 
441     static final class Stack<T> {
442 
443         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
444         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
445         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
446         // still recycling all items.
447         final Recycler<T> parent;
448 
449         // We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
450         // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack.
451         //
452         // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if
453         // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear
454         // it in a timely manner).
455         final WeakReference<Thread> threadRef;
456         final AtomicInteger availableSharedCapacity;
457         final int maxDelayedQueues;
458 
459         private final int maxCapacity;
460         private final int ratioMask;
461         private DefaultHandle<?>[] elements;
462         private int size;
463         private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
464         private WeakOrderQueue cursor, prev;
465         private volatile WeakOrderQueue head;
466 
467         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
468               int ratioMask, int maxDelayedQueues) {
469             this.parent = parent;
470             threadRef = new WeakReference<Thread>(thread);
471             this.maxCapacity = maxCapacity;
472             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
473             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
474             this.ratioMask = ratioMask;
475             this.maxDelayedQueues = maxDelayedQueues;
476         }
477 
478         // Marked as synchronized to ensure this is serialized.
479         synchronized void setHead(WeakOrderQueue queue) {
480             queue.setNext(head);
481             head = queue;
482         }
483 
484         int increaseCapacity(int expectedCapacity) {
485             int newCapacity = elements.length;
486             int maxCapacity = this.maxCapacity;
487             do {
488                 newCapacity <<= 1;
489             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
490 
491             newCapacity = min(newCapacity, maxCapacity);
492             if (newCapacity != elements.length) {
493                 elements = Arrays.copyOf(elements, newCapacity);
494             }
495 
496             return newCapacity;
497         }
498 
499         @SuppressWarnings({ "unchecked", "rawtypes" })
500         DefaultHandle<T> pop() {
501             int size = this.size;
502             if (size == 0) {
503                 if (!scavenge()) {
504                     return null;
505                 }
506                 size = this.size;
507             }
508             size --;
509             DefaultHandle ret = elements[size];
510             elements[size] = null;
511             if (ret.lastRecycledId != ret.recycleId) {
512                 throw new IllegalStateException("recycled multiple times");
513             }
514             ret.recycleId = 0;
515             ret.lastRecycledId = 0;
516             this.size = size;
517             return ret;
518         }
519 
520         boolean scavenge() {
521             // continue an existing scavenge, if any
522             if (scavengeSome()) {
523                 return true;
524             }
525 
526             // reset our scavenge cursor
527             prev = null;
528             cursor = head;
529             return false;
530         }
531 
532         boolean scavengeSome() {
533             WeakOrderQueue prev;
534             WeakOrderQueue cursor = this.cursor;
535             if (cursor == null) {
536                 prev = null;
537                 cursor = head;
538                 if (cursor == null) {
539                     return false;
540                 }
541             } else {
542                 prev = this.prev;
543             }
544 
545             boolean success = false;
546             do {
547                 if (cursor.transfer(this)) {
548                     success = true;
549                     break;
550                 }
551                 WeakOrderQueue next = cursor.next;
552                 if (cursor.owner.get() == null) {
553                     // If the thread associated with the queue is gone, unlink it, after
554                     // performing a volatile read to confirm there is no data left to collect.
555                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
556                     if (cursor.hasFinalData()) {
557                         for (;;) {
558                             if (cursor.transfer(this)) {
559                                 success = true;
560                             } else {
561                                 break;
562                             }
563                         }
564                     }
565 
566                     if (prev != null) {
567                         prev.setNext(next);
568                     }
569                 } else {
570                     prev = cursor;
571                 }
572 
573                 cursor = next;
574 
575             } while (cursor != null && !success);
576 
577             this.prev = prev;
578             this.cursor = cursor;
579             return success;
580         }
581 
582         void push(DefaultHandle<?> item) {
583             Thread currentThread = Thread.currentThread();
584             if (threadRef.get() == currentThread) {
585                 // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
586                 pushNow(item);
587             } else {
588                 // The current Thread is not the one that belongs to the Stack
589                 // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
590                 // happens later.
591                 pushLater(item, currentThread);
592             }
593         }
594 
595         private void pushNow(DefaultHandle<?> item) {
596             if ((item.recycleId | item.lastRecycledId) != 0) {
597                 throw new IllegalStateException("recycled already");
598             }
599             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
600 
601             int size = this.size;
602             if (size >= maxCapacity || dropHandle(item)) {
603                 // Hit the maximum capacity or should drop - drop the possibly youngest object.
604                 return;
605             }
606             if (size == elements.length) {
607                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
608             }
609 
610             elements[size] = item;
611             this.size = size + 1;
612         }
613 
614         private void pushLater(DefaultHandle<?> item, Thread thread) {
615             // we don't want to have a ref to the queue as the value in our weak map
616             // so we null it out; to ensure there are no races with restoring it later
617             // we impose a memory ordering here (no-op on x86)
618             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
619             WeakOrderQueue queue = delayedRecycled.get(this);
620             if (queue == null) {
621                 if (delayedRecycled.size() >= maxDelayedQueues) {
622                     // Add a dummy queue so we know we should drop the object
623                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
624                     return;
625                 }
626                 // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
627                 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
628                     // drop object
629                     return;
630                 }
631                 delayedRecycled.put(this, queue);
632             } else if (queue == WeakOrderQueue.DUMMY) {
633                 // drop object
634                 return;
635             }
636 
637             queue.add(item);
638         }
639 
640         boolean dropHandle(DefaultHandle<?> handle) {
641             if (!handle.hasBeenRecycled) {
642                 if ((++handleRecycleCount & ratioMask) != 0) {
643                     // Drop the object.
644                     return true;
645                 }
646                 handle.hasBeenRecycled = true;
647             }
648             return false;
649         }
650 
651         DefaultHandle<T> newHandle() {
652             return new DefaultHandle<T>(this);
653         }
654     }
655 }