View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.lang.ref.WeakReference;
25  import java.util.Arrays;
26  import java.util.Map;
27  import java.util.WeakHashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
31  import static java.lang.Math.max;
32  import static java.lang.Math.min;
33  
34  /**
35   * Light-weight object pool based on a thread-local stack.
36   *
37   * @param <T> the type of the pooled object
38   */
39  public abstract class Recycler<T> {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
42  
43      private static final Handle NOOP_HANDLE = new Handle() { };
44      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
45      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
46      private static final int DEFAULT_INITIAL_MAX_CAPACITY = 32768; // Use 32k instances as default max capacity.
47  
48      private static final int DEFAULT_MAX_CAPACITY;
49      private static final int INITIAL_CAPACITY;
50      private static final int MAX_SHARED_CAPACITY_FACTOR;
51      private static final int MAX_DELAYED_QUEUES_PER_THREAD;
52      private static final int LINK_CAPACITY;
53      private static final int RATIO;
54  
55      static {
56          // In the future, we might have different maxCapacity for different object types.
57          // e.g. io.netty.recycler.maxCapacity.writeTask
58          //      io.netty.recycler.maxCapacity.outboundBuffer
59          int maxCapacity = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity.default",
60                                                      DEFAULT_INITIAL_MAX_CAPACITY);
61          if (maxCapacity < 0) {
62              maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY;
63          }
64          DEFAULT_MAX_CAPACITY = maxCapacity;
65  
66          MAX_SHARED_CAPACITY_FACTOR = max(2,
67                  SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
68                          2));
69  
70          MAX_DELAYED_QUEUES_PER_THREAD = max(0,
71                  SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
72                          // We use the same value as default EventLoop number
73                          NettyRuntime.availableProcessors() * 2));
74  
75          LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
76                  max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
77  
78          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
79          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
80          // bursts.
81          RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
82  
83          if (logger.isDebugEnabled()) {
84              if (DEFAULT_MAX_CAPACITY == 0) {
85                  logger.debug("-Dio.netty.recycler.maxCapacity.default: disabled");
86                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
87                  logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
88                  logger.debug("-Dio.netty.recycler.ratio: disabled");
89              } else {
90                  logger.debug("-Dio.netty.recycler.maxCapacity.default: {}", DEFAULT_MAX_CAPACITY);
91                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
92                  logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
93                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
94              }
95          }
96  
97          INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256);
98      }
99  
100     private final int maxCapacity;
101     private final int maxSharedCapacityFactor;
102     private final int ratioMask;
103     private final int maxDelayedQueuesPerThread;
104 
105     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
106         @Override
107         protected Stack<T> initialValue() {
108             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor,
109                     ratioMask, maxDelayedQueuesPerThread);
110         }
111 
112         @Override
113         protected void onRemoval(Stack<T> value) {
114             // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
115             if (value.threadRef.get() == Thread.currentThread()) {
116                if (DELAYED_RECYCLED.isSet()) {
117                    DELAYED_RECYCLED.get().remove(value);
118                }
119             }
120         }
121     };
122 
123     protected Recycler() {
124         this(DEFAULT_MAX_CAPACITY);
125     }
126 
127     protected Recycler(int maxCapacity) {
128         this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR);
129     }
130 
131     protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
132         this(maxCapacity, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
133     }
134 
135     protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
136         ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
137         if (maxCapacity <= 0) {
138             this.maxCapacity = 0;
139             this.maxSharedCapacityFactor = 1;
140             this.maxDelayedQueuesPerThread = 0;
141         } else {
142             this.maxCapacity = maxCapacity;
143             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
144             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
145         }
146     }
147 
148     @SuppressWarnings("unchecked")
149     public final T get() {
150         if (maxCapacity == 0) {
151             return newObject(NOOP_HANDLE);
152         }
153         Stack<T> stack = threadLocal.get();
154         DefaultHandle handle = stack.pop();
155         if (handle == null) {
156             handle = stack.newHandle();
157             handle.value = newObject(handle);
158         }
159         return (T) handle.value;
160     }
161 
162     public final boolean recycle(T o, Handle handle) {
163         if (handle == NOOP_HANDLE) {
164             return false;
165         }
166 
167         DefaultHandle h = (DefaultHandle) handle;
168         if (h.stack.parent != this) {
169             return false;
170         }
171         if (o != h.value) {
172             throw new IllegalArgumentException("o does not belong to handle");
173         }
174         h.recycle();
175         return true;
176     }
177 
178     protected abstract T newObject(Handle handle);
179 
180     final int threadLocalCapacity() {
181         return threadLocal.get().elements.length;
182     }
183 
184     final int threadLocalSize() {
185         return threadLocal.get().size;
186     }
187 
188     public interface Handle { }
189 
190     static final class DefaultHandle implements Handle {
191         private int lastRecycledId;
192         private int recycleId;
193 
194         boolean hasBeenRecycled;
195 
196         private Stack<?> stack;
197         private Object value;
198 
199         DefaultHandle(Stack<?> stack) {
200             this.stack = stack;
201         }
202 
203         public void recycle() {
204             stack.push(this);
205         }
206     }
207 
208     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
209             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
210         @Override
211         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
212             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
213         }
214     };
215 
216     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
217     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
218     private static final class WeakOrderQueue {
219 
220         static final WeakOrderQueue DUMMY = new WeakOrderQueue();
221 
222         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
223         @SuppressWarnings("serial")
224         private static final class Link extends AtomicInteger {
225             private final DefaultHandle[] elements = new DefaultHandle[LINK_CAPACITY];
226 
227             private int readIndex;
228             private Link next;
229         }
230 
231         // chain of data items
232         private Link head, tail;
233         // pointer to another queue of delayed items for the same stack
234         private WeakOrderQueue next;
235         private final WeakReference<Thread> owner;
236         private final int id = ID_GENERATOR.getAndIncrement();
237         private final AtomicInteger availableSharedCapacity;
238 
239         private WeakOrderQueue() {
240             owner = null;
241             availableSharedCapacity = null;
242         }
243 
244         private WeakOrderQueue(Stack<?> stack, Thread thread) {
245             head = tail = new Link();
246             owner = new WeakReference<Thread>(thread);
247 
248             // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
249             // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
250             // Stack itself GCed.
251             availableSharedCapacity = stack.availableSharedCapacity;
252         }
253 
254         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
255             WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
256             // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
257             // may be accessed while its still constructed.
258             stack.setHead(queue);
259             return queue;
260         }
261 
262         private void setNext(WeakOrderQueue next) {
263             assert next != this;
264             this.next = next;
265         }
266 
267         /**
268          * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
269          */
270         static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
271             // We allocated a Link so reserve the space
272             return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
273                     ? newQueue(stack, thread) : null;
274         }
275 
276         private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
277             assert space >= 0;
278             for (;;) {
279                 int available = availableSharedCapacity.get();
280                 if (available < space) {
281                     return false;
282                 }
283                 if (availableSharedCapacity.compareAndSet(available, available - space)) {
284                     return true;
285                 }
286             }
287         }
288 
289         private void reclaimSpace(int space) {
290             assert space >= 0;
291             availableSharedCapacity.addAndGet(space);
292         }
293 
294         void add(DefaultHandle handle) {
295             handle.lastRecycledId = id;
296 
297             Link tail = this.tail;
298             int writeIndex;
299             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
300                 if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
301                     // Drop it.
302                     return;
303                 }
304                 // We allocate a Link so reserve the space
305                 this.tail = tail = tail.next = new Link();
306 
307                 writeIndex = tail.get();
308             }
309             tail.elements[writeIndex] = handle;
310             handle.stack = null;
311             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
312             // this also means we guarantee visibility of an element in the queue if we see the index updated
313             tail.lazySet(writeIndex + 1);
314         }
315 
316         boolean hasFinalData() {
317             return tail.readIndex != tail.get();
318         }
319 
320         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
321         @SuppressWarnings("rawtypes")
322         boolean transfer(Stack<?> dst) {
323             Link head = this.head;
324             if (head == null) {
325                 return false;
326             }
327 
328             if (head.readIndex == LINK_CAPACITY) {
329                 if (head.next == null) {
330                     return false;
331                 }
332                 this.head = head = head.next;
333             }
334 
335             final int srcStart = head.readIndex;
336             int srcEnd = head.get();
337             final int srcSize = srcEnd - srcStart;
338             if (srcSize == 0) {
339                 return false;
340             }
341 
342             final int dstSize = dst.size;
343             final int expectedCapacity = dstSize + srcSize;
344 
345             if (expectedCapacity > dst.elements.length) {
346                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
347                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
348             }
349 
350             if (srcStart != srcEnd) {
351                 final DefaultHandle[] srcElems = head.elements;
352                 final DefaultHandle[] dstElems = dst.elements;
353                 int newDstSize = dstSize;
354                 for (int i = srcStart; i < srcEnd; i++) {
355                     DefaultHandle element = srcElems[i];
356                     if (element.recycleId == 0) {
357                         element.recycleId = element.lastRecycledId;
358                     } else if (element.recycleId != element.lastRecycledId) {
359                         throw new IllegalStateException("recycled already");
360                     }
361                     srcElems[i] = null;
362 
363                     if (dst.dropHandle(element)) {
364                         // Drop the object.
365                         continue;
366                     }
367                     element.stack = dst;
368                     dstElems[newDstSize ++] = element;
369                 }
370 
371                 if (srcEnd == LINK_CAPACITY && head.next != null) {
372                     // Add capacity back as the Link is GCed.
373                     reclaimSpace(LINK_CAPACITY);
374 
375                     this.head = head.next;
376                 }
377 
378                 head.readIndex = srcEnd;
379                 if (dst.size == newDstSize) {
380                     return false;
381                 }
382                 dst.size = newDstSize;
383                 return true;
384             } else {
385                 // The destination stack is full already.
386                 return false;
387             }
388         }
389 
390         @Override
391         protected void finalize() throws Throwable {
392             try {
393                 super.finalize();
394             } finally {
395                 // We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in
396                 // the stack. This is needed as we not have a good life-time control over the queue as it is used in a
397                 // WeakHashMap which will drop it at any time.
398                 Link link = head;
399                 while (link != null) {
400                     reclaimSpace(LINK_CAPACITY);
401                     link = link.next;
402                 }
403             }
404         }
405     }
406 
407     static final class Stack<T> {
408 
409         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
410         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
411         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
412         // still recycling all items.
413         final Recycler<T> parent;
414 
415         // We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
416         // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack.
417         //
418         // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if
419         // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear
420         // it in a timely manner).
421         final WeakReference<Thread> threadRef;
422         final AtomicInteger availableSharedCapacity;
423         final int maxDelayedQueues;
424         private final int maxCapacity;
425         private final int ratioMask;
426         private DefaultHandle[] elements;
427         private int size;
428         private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
429         private WeakOrderQueue cursor, prev;
430         private volatile WeakOrderQueue head;
431 
432         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
433               int ratioMask, int maxDelayedQueues) {
434             this.parent = parent;
435             threadRef = new WeakReference<Thread>(thread);
436             this.maxCapacity = maxCapacity;
437             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
438             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
439             this.ratioMask = ratioMask;
440             this.maxDelayedQueues = maxDelayedQueues;
441         }
442 
443         // Marked as synchronized to ensure this is serialized.
444         synchronized void setHead(WeakOrderQueue queue) {
445             queue.setNext(head);
446             head = queue;
447         }
448 
449         int increaseCapacity(int expectedCapacity) {
450             int newCapacity = elements.length;
451             int maxCapacity = this.maxCapacity;
452             do {
453                 newCapacity <<= 1;
454             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
455 
456             newCapacity = min(newCapacity, maxCapacity);
457             if (newCapacity != elements.length) {
458                 elements = Arrays.copyOf(elements, newCapacity);
459             }
460 
461             return newCapacity;
462         }
463 
464         DefaultHandle pop() {
465             int size = this.size;
466             if (size == 0) {
467                 if (!scavenge()) {
468                     return null;
469                 }
470                 size = this.size;
471             }
472             size --;
473             DefaultHandle ret = elements[size];
474             elements[size] = null;
475             if (ret.lastRecycledId != ret.recycleId) {
476                 throw new IllegalStateException("recycled multiple times");
477             }
478             ret.recycleId = 0;
479             ret.lastRecycledId = 0;
480             this.size = size;
481             return ret;
482         }
483 
484         boolean scavenge() {
485             // continue an existing scavenge, if any
486             if (scavengeSome()) {
487                 return true;
488             }
489 
490             // reset our scavenge cursor
491             prev = null;
492             cursor = head;
493             return false;
494         }
495 
496         boolean scavengeSome() {
497             WeakOrderQueue prev;
498             WeakOrderQueue cursor = this.cursor;
499             if (cursor == null) {
500                 prev = null;
501                 cursor = head;
502                 if (cursor == null) {
503                     return false;
504                 }
505             } else {
506                 prev = this.prev;
507             }
508 
509             boolean success = false;
510             do {
511                 if (cursor.transfer(this)) {
512                     success = true;
513                     break;
514                 }
515                 WeakOrderQueue next = cursor.next;
516                 if (cursor.owner.get() == null) {
517                     // If the thread associated with the queue is gone, unlink it, after
518                     // performing a volatile read to confirm there is no data left to collect.
519                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
520                     if (cursor.hasFinalData()) {
521                         for (;;) {
522                             if (cursor.transfer(this)) {
523                                 success = true;
524                             } else {
525                                 break;
526                             }
527                         }
528                     }
529 
530                     if (prev != null) {
531                         prev.setNext(next);
532                     }
533                 } else {
534                     prev = cursor;
535                 }
536 
537                 cursor = next;
538 
539             } while (cursor != null && !success);
540 
541             this.prev = prev;
542             this.cursor = cursor;
543             return success;
544         }
545 
546         void push(DefaultHandle item) {
547             Thread currentThread = Thread.currentThread();
548             if (threadRef.get() == currentThread) {
549                 // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
550                 pushNow(item);
551             } else {
552                 // The current Thread is not the one that belongs to the Stack
553                 // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
554                 // happens later.
555                 pushLater(item, currentThread);
556             }
557         }
558 
559         private void pushNow(DefaultHandle item) {
560             if ((item.recycleId | item.lastRecycledId) != 0) {
561                 throw new IllegalStateException("recycled already");
562             }
563             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
564 
565             int size = this.size;
566             if (size >= maxCapacity || dropHandle(item)) {
567                 // Hit the maximum capacity or should drop - drop the possibly youngest object.
568                 return;
569             }
570             if (size == elements.length) {
571                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
572             }
573 
574             elements[size] = item;
575             this.size = size + 1;
576         }
577 
578         private void pushLater(DefaultHandle item, Thread thread) {
579             // we don't want to have a ref to the queue as the value in our weak map
580             // so we null it out; to ensure there are no races with restoring it later
581             // we impose a memory ordering here (no-op on x86)
582             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
583             WeakOrderQueue queue = delayedRecycled.get(this);
584             if (queue == null) {
585                 if (delayedRecycled.size() >= maxDelayedQueues) {
586                     // Add a dummy queue so we know we should drop the object
587                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
588                     return;
589                 }
590                 // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
591                 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
592                     // drop object
593                     return;
594                 }
595                 delayedRecycled.put(this, queue);
596             } else if (queue == WeakOrderQueue.DUMMY) {
597                 // drop object
598                 return;
599             }
600 
601             queue.add(item);
602         }
603 
604         boolean dropHandle(DefaultHandle handle) {
605             if (!handle.hasBeenRecycled) {
606                 if ((++handleRecycleCount & ratioMask) != 0) {
607                     // Drop the object.
608                     return true;
609                 }
610                 handle.hasBeenRecycled = true;
611             }
612             return false;
613         }
614 
615         DefaultHandle newHandle() {
616             return new DefaultHandle(this);
617         }
618     }
619 }