View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.lang.ref.WeakReference;
25  import java.util.Arrays;
26  import java.util.Map;
27  import java.util.WeakHashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
31  import static java.lang.Math.max;
32  import static java.lang.Math.min;
33  
34  /**
35   * Light-weight object pool based on a thread-local stack.
36   *
37   * @param <T> the type of the pooled object
38   */
39  public abstract class Recycler<T> {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
42  
43      @SuppressWarnings("rawtypes")
44      private static final Handle NOOP_HANDLE = new Handle() {
45          @Override
46          public void recycle(Object object) {
47              // NOOP
48          }
49      };
50      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
51      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
52      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 32768; // Use 32k instances as default.
53      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
54      private static final int INITIAL_CAPACITY;
55      private static final int MAX_SHARED_CAPACITY_FACTOR;
56      private static final int MAX_DELAYED_QUEUES_PER_THREAD;
57      private static final int LINK_CAPACITY;
58      private static final int RATIO;
59  
60      static {
61          // In the future, we might have different maxCapacity for different object types.
62          // e.g. io.netty.recycler.maxCapacity.writeTask
63          //      io.netty.recycler.maxCapacity.outboundBuffer
64          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
65                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
66          if (maxCapacityPerThread < 0) {
67              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
68          }
69  
70          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
71  
72          MAX_SHARED_CAPACITY_FACTOR = max(2,
73                  SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
74                          2));
75  
76          MAX_DELAYED_QUEUES_PER_THREAD = max(0,
77                  SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
78                          // We use the same value as default EventLoop number
79                          NettyRuntime.availableProcessors() * 2));
80  
81          LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
82                  max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
83  
84          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
85          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
86          // bursts.
87          RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
88  
89          if (logger.isDebugEnabled()) {
90              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
91                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
92                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
93                  logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
94                  logger.debug("-Dio.netty.recycler.ratio: disabled");
95              } else {
96                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
98                  logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
99                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
100             }
101         }
102 
103         INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
104     }
105 
106     private final int maxCapacityPerThread;
107     private final int maxSharedCapacityFactor;
108     private final int ratioMask;
109     private final int maxDelayedQueuesPerThread;
110 
111     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
112         @Override
113         protected Stack<T> initialValue() {
114             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
115                     ratioMask, maxDelayedQueuesPerThread);
116         }
117     };
118 
119     protected Recycler() {
120         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
121     }
122 
123     protected Recycler(int maxCapacityPerThread) {
124         this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
125     }
126 
127     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
128         this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
129     }
130 
131     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
132                        int ratio, int maxDelayedQueuesPerThread) {
133         ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
134         if (maxCapacityPerThread <= 0) {
135             this.maxCapacityPerThread = 0;
136             this.maxSharedCapacityFactor = 1;
137             this.maxDelayedQueuesPerThread = 0;
138         } else {
139             this.maxCapacityPerThread = maxCapacityPerThread;
140             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
141             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
142         }
143     }
144 
145     @SuppressWarnings("unchecked")
146     public final T get() {
147         if (maxCapacityPerThread == 0) {
148             return newObject((Handle<T>) NOOP_HANDLE);
149         }
150         Stack<T> stack = threadLocal.get();
151         DefaultHandle<T> handle = stack.pop();
152         if (handle == null) {
153             handle = stack.newHandle();
154             handle.value = newObject(handle);
155         }
156         return (T) handle.value;
157     }
158 
159     /**
160      * @deprecated use {@link Handle#recycle(Object)}.
161      */
162     @Deprecated
163     public final boolean recycle(T o, Handle<T> handle) {
164         if (handle == NOOP_HANDLE) {
165             return false;
166         }
167 
168         DefaultHandle<T> h = (DefaultHandle<T>) handle;
169         if (h.stack.parent != this) {
170             return false;
171         }
172 
173         h.recycle(o);
174         return true;
175     }
176 
177     final int threadLocalCapacity() {
178         return threadLocal.get().elements.length;
179     }
180 
181     final int threadLocalSize() {
182         return threadLocal.get().size;
183     }
184 
185     protected abstract T newObject(Handle<T> handle);
186 
187     public interface Handle<T> {
188         void recycle(T object);
189     }
190 
191     static final class DefaultHandle<T> implements Handle<T> {
192         private int lastRecycledId;
193         private int recycleId;
194 
195         boolean hasBeenRecycled;
196 
197         private Stack<?> stack;
198         private Object value;
199 
200         DefaultHandle(Stack<?> stack) {
201             this.stack = stack;
202         }
203 
204         @Override
205         public void recycle(Object object) {
206             if (object != value) {
207                 throw new IllegalArgumentException("object does not belong to handle");
208             }
209             stack.push(this);
210         }
211     }
212 
213     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
214             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
215         @Override
216         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
217             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
218         }
219     };
220 
221     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
222     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
223     private static final class WeakOrderQueue {
224 
225         static final WeakOrderQueue DUMMY = new WeakOrderQueue();
226 
227         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
228         @SuppressWarnings("serial")
229         private static final class Link extends AtomicInteger {
230             private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
231 
232             private int readIndex;
233             private Link next;
234         }
235 
236         // chain of data items
237         private Link head, tail;
238         // pointer to another queue of delayed items for the same stack
239         private WeakOrderQueue next;
240         private final WeakReference<Thread> owner;
241         private final int id = ID_GENERATOR.getAndIncrement();
242         private final AtomicInteger availableSharedCapacity;
243 
244         private WeakOrderQueue() {
245             owner = null;
246             availableSharedCapacity = null;
247         }
248 
249         private WeakOrderQueue(Stack<?> stack, Thread thread) {
250             head = tail = new Link();
251             owner = new WeakReference<Thread>(thread);
252 
253             // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
254             // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
255             // Stack itself GCed.
256             availableSharedCapacity = stack.availableSharedCapacity;
257         }
258 
259         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
260             WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
261             // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
262             // may be accessed while its still constructed.
263             stack.setHead(queue);
264             return queue;
265         }
266 
267         private void setNext(WeakOrderQueue next) {
268             assert next != this;
269             this.next = next;
270         }
271 
272         /**
273          * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
274          */
275         static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
276             // We allocated a Link so reserve the space
277             return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
278                     ? WeakOrderQueue.newQueue(stack, thread) : null;
279         }
280 
281         private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
282             assert space >= 0;
283             for (;;) {
284                 int available = availableSharedCapacity.get();
285                 if (available < space) {
286                     return false;
287                 }
288                 if (availableSharedCapacity.compareAndSet(available, available - space)) {
289                     return true;
290                 }
291             }
292         }
293 
294         private void reclaimSpace(int space) {
295             assert space >= 0;
296             availableSharedCapacity.addAndGet(space);
297         }
298 
299         void add(DefaultHandle<?> handle) {
300             handle.lastRecycledId = id;
301 
302             Link tail = this.tail;
303             int writeIndex;
304             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
305                 if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
306                     // Drop it.
307                     return;
308                 }
309                 // We allocate a Link so reserve the space
310                 this.tail = tail = tail.next = new Link();
311 
312                 writeIndex = tail.get();
313             }
314             tail.elements[writeIndex] = handle;
315             handle.stack = null;
316             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
317             // this also means we guarantee visibility of an element in the queue if we see the index updated
318             tail.lazySet(writeIndex + 1);
319         }
320 
321         boolean hasFinalData() {
322             return tail.readIndex != tail.get();
323         }
324 
325         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
326         @SuppressWarnings("rawtypes")
327         boolean transfer(Stack<?> dst) {
328             Link head = this.head;
329             if (head == null) {
330                 return false;
331             }
332 
333             if (head.readIndex == LINK_CAPACITY) {
334                 if (head.next == null) {
335                     return false;
336                 }
337                 this.head = head = head.next;
338             }
339 
340             final int srcStart = head.readIndex;
341             int srcEnd = head.get();
342             final int srcSize = srcEnd - srcStart;
343             if (srcSize == 0) {
344                 return false;
345             }
346 
347             final int dstSize = dst.size;
348             final int expectedCapacity = dstSize + srcSize;
349 
350             if (expectedCapacity > dst.elements.length) {
351                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
352                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
353             }
354 
355             if (srcStart != srcEnd) {
356                 final DefaultHandle[] srcElems = head.elements;
357                 final DefaultHandle[] dstElems = dst.elements;
358                 int newDstSize = dstSize;
359                 for (int i = srcStart; i < srcEnd; i++) {
360                     DefaultHandle element = srcElems[i];
361                     if (element.recycleId == 0) {
362                         element.recycleId = element.lastRecycledId;
363                     } else if (element.recycleId != element.lastRecycledId) {
364                         throw new IllegalStateException("recycled already");
365                     }
366                     srcElems[i] = null;
367 
368                     if (dst.dropHandle(element)) {
369                         // Drop the object.
370                         continue;
371                     }
372                     element.stack = dst;
373                     dstElems[newDstSize ++] = element;
374                 }
375 
376                 if (srcEnd == LINK_CAPACITY && head.next != null) {
377                     // Add capacity back as the Link is GCed.
378                     reclaimSpace(LINK_CAPACITY);
379 
380                     this.head = head.next;
381                 }
382 
383                 head.readIndex = srcEnd;
384                 if (dst.size == newDstSize) {
385                     return false;
386                 }
387                 dst.size = newDstSize;
388                 return true;
389             } else {
390                 // The destination stack is full already.
391                 return false;
392             }
393         }
394 
395         @Override
396         protected void finalize() throws Throwable {
397             try {
398                 super.finalize();
399             } finally {
400                 // We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in
401                 // the stack. This is needed as we not have a good life-time control over the queue as it is used in a
402                 // WeakHashMap which will drop it at any time.
403                 Link link = head;
404                 while (link != null) {
405                     reclaimSpace(LINK_CAPACITY);
406                     link = link.next;
407                 }
408             }
409         }
410     }
411 
412     static final class Stack<T> {
413 
414         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
415         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
416         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
417         // still recycling all items.
418         final Recycler<T> parent;
419 
420         // We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
421         // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack.
422         //
423         // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if
424         // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear
425         // it in a timely manner).
426         final WeakReference<Thread> threadRef;
427         final AtomicInteger availableSharedCapacity;
428         final int maxDelayedQueues;
429 
430         private final int maxCapacity;
431         private final int ratioMask;
432         private DefaultHandle<?>[] elements;
433         private int size;
434         private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
435         private WeakOrderQueue cursor, prev;
436         private volatile WeakOrderQueue head;
437 
438         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
439               int ratioMask, int maxDelayedQueues) {
440             this.parent = parent;
441             threadRef = new WeakReference<Thread>(thread);
442             this.maxCapacity = maxCapacity;
443             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
444             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
445             this.ratioMask = ratioMask;
446             this.maxDelayedQueues = maxDelayedQueues;
447         }
448 
449         // Marked as synchronized to ensure this is serialized.
450         synchronized void setHead(WeakOrderQueue queue) {
451             queue.setNext(head);
452             head = queue;
453         }
454 
455         int increaseCapacity(int expectedCapacity) {
456             int newCapacity = elements.length;
457             int maxCapacity = this.maxCapacity;
458             do {
459                 newCapacity <<= 1;
460             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
461 
462             newCapacity = min(newCapacity, maxCapacity);
463             if (newCapacity != elements.length) {
464                 elements = Arrays.copyOf(elements, newCapacity);
465             }
466 
467             return newCapacity;
468         }
469 
470         @SuppressWarnings({ "unchecked", "rawtypes" })
471         DefaultHandle<T> pop() {
472             int size = this.size;
473             if (size == 0) {
474                 if (!scavenge()) {
475                     return null;
476                 }
477                 size = this.size;
478             }
479             size --;
480             DefaultHandle ret = elements[size];
481             elements[size] = null;
482             if (ret.lastRecycledId != ret.recycleId) {
483                 throw new IllegalStateException("recycled multiple times");
484             }
485             ret.recycleId = 0;
486             ret.lastRecycledId = 0;
487             this.size = size;
488             return ret;
489         }
490 
491         boolean scavenge() {
492             // continue an existing scavenge, if any
493             if (scavengeSome()) {
494                 return true;
495             }
496 
497             // reset our scavenge cursor
498             prev = null;
499             cursor = head;
500             return false;
501         }
502 
503         boolean scavengeSome() {
504             WeakOrderQueue prev;
505             WeakOrderQueue cursor = this.cursor;
506             if (cursor == null) {
507                 prev = null;
508                 cursor = head;
509                 if (cursor == null) {
510                     return false;
511                 }
512             } else {
513                 prev = this.prev;
514             }
515 
516             boolean success = false;
517             do {
518                 if (cursor.transfer(this)) {
519                     success = true;
520                     break;
521                 }
522                 WeakOrderQueue next = cursor.next;
523                 if (cursor.owner.get() == null) {
524                     // If the thread associated with the queue is gone, unlink it, after
525                     // performing a volatile read to confirm there is no data left to collect.
526                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
527                     if (cursor.hasFinalData()) {
528                         for (;;) {
529                             if (cursor.transfer(this)) {
530                                 success = true;
531                             } else {
532                                 break;
533                             }
534                         }
535                     }
536 
537                     if (prev != null) {
538                         prev.setNext(next);
539                     }
540                 } else {
541                     prev = cursor;
542                 }
543 
544                 cursor = next;
545 
546             } while (cursor != null && !success);
547 
548             this.prev = prev;
549             this.cursor = cursor;
550             return success;
551         }
552 
553         void push(DefaultHandle<?> item) {
554             Thread currentThread = Thread.currentThread();
555             if (threadRef.get() == currentThread) {
556                 // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
557                 pushNow(item);
558             } else {
559                 // The current Thread is not the one that belongs to the Stack
560                 // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
561                 // happens later.
562                 pushLater(item, currentThread);
563             }
564         }
565 
566         private void pushNow(DefaultHandle<?> item) {
567             if ((item.recycleId | item.lastRecycledId) != 0) {
568                 throw new IllegalStateException("recycled already");
569             }
570             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
571 
572             int size = this.size;
573             if (size >= maxCapacity || dropHandle(item)) {
574                 // Hit the maximum capacity or should drop - drop the possibly youngest object.
575                 return;
576             }
577             if (size == elements.length) {
578                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
579             }
580 
581             elements[size] = item;
582             this.size = size + 1;
583         }
584 
585         private void pushLater(DefaultHandle<?> item, Thread thread) {
586             // we don't want to have a ref to the queue as the value in our weak map
587             // so we null it out; to ensure there are no races with restoring it later
588             // we impose a memory ordering here (no-op on x86)
589             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
590             WeakOrderQueue queue = delayedRecycled.get(this);
591             if (queue == null) {
592                 if (delayedRecycled.size() >= maxDelayedQueues) {
593                     // Add a dummy queue so we know we should drop the object
594                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
595                     return;
596                 }
597                 // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
598                 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
599                     // drop object
600                     return;
601                 }
602                 delayedRecycled.put(this, queue);
603             } else if (queue == WeakOrderQueue.DUMMY) {
604                 // drop object
605                 return;
606             }
607 
608             queue.add(item);
609         }
610 
611         boolean dropHandle(DefaultHandle<?> handle) {
612             if (!handle.hasBeenRecycled) {
613                 if ((++handleRecycleCount & ratioMask) != 0) {
614                     // Drop the object.
615                     return true;
616                 }
617                 handle.hasBeenRecycled = true;
618             }
619             return false;
620         }
621 
622         DefaultHandle<T> newHandle() {
623             return new DefaultHandle<T>(this);
624         }
625     }
626 }