View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.lang.ref.WeakReference;
25  import java.util.Arrays;
26  import java.util.Map;
27  import java.util.WeakHashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
31  import static java.lang.Math.max;
32  import static java.lang.Math.min;
33  
34  /**
35   * Light-weight object pool based on a thread-local stack.
36   *
37   * @param <T> the type of the pooled object
38   */
39  public abstract class Recycler<T> {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
42  
43      private static final Handle NOOP_HANDLE = new Handle() { };
44      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
45      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
46      private static final int DEFAULT_INITIAL_MAX_CAPACITY = 32768; // Use 32k instances as default max capacity.
47  
48      private static final int DEFAULT_MAX_CAPACITY;
49      private static final int INITIAL_CAPACITY;
50      private static final int MAX_SHARED_CAPACITY_FACTOR;
51      private static final int MAX_DELAYED_QUEUES_PER_THREAD;
52      private static final int LINK_CAPACITY;
53      private static final int RATIO;
54  
55      static {
56          // In the future, we might have different maxCapacity for different object types.
57          // e.g. io.netty.recycler.maxCapacity.writeTask
58          //      io.netty.recycler.maxCapacity.outboundBuffer
59          int maxCapacity = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity.default",
60                                                      DEFAULT_INITIAL_MAX_CAPACITY);
61          if (maxCapacity < 0) {
62              maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY;
63          }
64          DEFAULT_MAX_CAPACITY = maxCapacity;
65  
66          MAX_SHARED_CAPACITY_FACTOR = max(2,
67                  SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
68                          2));
69  
70          MAX_DELAYED_QUEUES_PER_THREAD = max(0,
71                  SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
72                          // We use the same value as default EventLoop number
73                          NettyRuntime.availableProcessors() * 2));
74  
75          LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
76                  max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
77  
78          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
79          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
80          // bursts.
81          RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
82  
83          if (logger.isDebugEnabled()) {
84              if (DEFAULT_MAX_CAPACITY == 0) {
85                  logger.debug("-Dio.netty.recycler.maxCapacity.default: disabled");
86                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
87                  logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
88                  logger.debug("-Dio.netty.recycler.ratio: disabled");
89              } else {
90                  logger.debug("-Dio.netty.recycler.maxCapacity.default: {}", DEFAULT_MAX_CAPACITY);
91                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
92                  logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
93                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
94              }
95          }
96  
97          INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256);
98      }
99  
100     private final int maxCapacity;
101     private final int maxSharedCapacityFactor;
102     private final int ratioMask;
103     private final int maxDelayedQueuesPerThread;
104 
105     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
106         @Override
107         protected Stack<T> initialValue() {
108             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor,
109                     ratioMask, maxDelayedQueuesPerThread);
110         }
111     };
112 
113     protected Recycler() {
114         this(DEFAULT_MAX_CAPACITY);
115     }
116 
117     protected Recycler(int maxCapacity) {
118         this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR);
119     }
120 
121     protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
122         this(maxCapacity, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
123     }
124 
125     protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
126         ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
127         if (maxCapacity <= 0) {
128             this.maxCapacity = 0;
129             this.maxSharedCapacityFactor = 1;
130             this.maxDelayedQueuesPerThread = 0;
131         } else {
132             this.maxCapacity = maxCapacity;
133             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
134             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
135         }
136     }
137 
138     @SuppressWarnings("unchecked")
139     public final T get() {
140         if (maxCapacity == 0) {
141             return newObject(NOOP_HANDLE);
142         }
143         Stack<T> stack = threadLocal.get();
144         DefaultHandle handle = stack.pop();
145         if (handle == null) {
146             handle = stack.newHandle();
147             handle.value = newObject(handle);
148         }
149         return (T) handle.value;
150     }
151 
152     public final boolean recycle(T o, Handle handle) {
153         if (handle == NOOP_HANDLE) {
154             return false;
155         }
156 
157         DefaultHandle h = (DefaultHandle) handle;
158         if (h.stack.parent != this) {
159             return false;
160         }
161         if (o != h.value) {
162             throw new IllegalArgumentException("o does not belong to handle");
163         }
164         h.recycle();
165         return true;
166     }
167 
168     protected abstract T newObject(Handle handle);
169 
170     final int threadLocalCapacity() {
171         return threadLocal.get().elements.length;
172     }
173 
174     final int threadLocalSize() {
175         return threadLocal.get().size;
176     }
177 
178     public interface Handle { }
179 
180     static final class DefaultHandle implements Handle {
181         private int lastRecycledId;
182         private int recycleId;
183 
184         boolean hasBeenRecycled;
185 
186         private Stack<?> stack;
187         private Object value;
188 
189         DefaultHandle(Stack<?> stack) {
190             this.stack = stack;
191         }
192 
193         public void recycle() {
194             stack.push(this);
195         }
196     }
197 
198     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
199             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
200         @Override
201         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
202             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
203         }
204     };
205 
206     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
207     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
208     private static final class WeakOrderQueue {
209 
210         static final WeakOrderQueue DUMMY = new WeakOrderQueue();
211 
212         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
213         @SuppressWarnings("serial")
214         private static final class Link extends AtomicInteger {
215             private final DefaultHandle[] elements = new DefaultHandle[LINK_CAPACITY];
216 
217             private int readIndex;
218             private Link next;
219         }
220 
221         // chain of data items
222         private Link head, tail;
223         // pointer to another queue of delayed items for the same stack
224         private WeakOrderQueue next;
225         private final WeakReference<Thread> owner;
226         private final int id = ID_GENERATOR.getAndIncrement();
227         private final AtomicInteger availableSharedCapacity;
228 
229         private WeakOrderQueue() {
230             owner = null;
231             availableSharedCapacity = null;
232         }
233 
234         private WeakOrderQueue(Stack<?> stack, Thread thread) {
235             head = tail = new Link();
236             owner = new WeakReference<Thread>(thread);
237 
238             // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
239             // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
240             // Stack itself GCed.
241             availableSharedCapacity = stack.availableSharedCapacity;
242         }
243 
244         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
245             WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
246             // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
247             // may be accessed while its still constructed.
248             stack.setHead(queue);
249             return queue;
250         }
251 
252         private void setNext(WeakOrderQueue next) {
253             assert next != this;
254             this.next = next;
255         }
256 
257         /**
258          * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
259          */
260         static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
261             // We allocated a Link so reserve the space
262             return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
263                     ? WeakOrderQueue.newQueue(stack, thread) : null;
264         }
265 
266         private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
267             assert space >= 0;
268             for (;;) {
269                 int available = availableSharedCapacity.get();
270                 if (available < space) {
271                     return false;
272                 }
273                 if (availableSharedCapacity.compareAndSet(available, available - space)) {
274                     return true;
275                 }
276             }
277         }
278 
279         private void reclaimSpace(int space) {
280             assert space >= 0;
281             availableSharedCapacity.addAndGet(space);
282         }
283 
284         void add(DefaultHandle handle) {
285             handle.lastRecycledId = id;
286 
287             Link tail = this.tail;
288             int writeIndex;
289             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
290                 if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
291                     // Drop it.
292                     return;
293                 }
294                 // We allocate a Link so reserve the space
295                 this.tail = tail = tail.next = new Link();
296 
297                 writeIndex = tail.get();
298             }
299             tail.elements[writeIndex] = handle;
300             handle.stack = null;
301             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
302             // this also means we guarantee visibility of an element in the queue if we see the index updated
303             tail.lazySet(writeIndex + 1);
304         }
305 
306         boolean hasFinalData() {
307             return tail.readIndex != tail.get();
308         }
309 
310         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
311         @SuppressWarnings("rawtypes")
312         boolean transfer(Stack<?> dst) {
313             Link head = this.head;
314             if (head == null) {
315                 return false;
316             }
317 
318             if (head.readIndex == LINK_CAPACITY) {
319                 if (head.next == null) {
320                     return false;
321                 }
322                 this.head = head = head.next;
323             }
324 
325             final int srcStart = head.readIndex;
326             int srcEnd = head.get();
327             final int srcSize = srcEnd - srcStart;
328             if (srcSize == 0) {
329                 return false;
330             }
331 
332             final int dstSize = dst.size;
333             final int expectedCapacity = dstSize + srcSize;
334 
335             if (expectedCapacity > dst.elements.length) {
336                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
337                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
338             }
339 
340             if (srcStart != srcEnd) {
341                 final DefaultHandle[] srcElems = head.elements;
342                 final DefaultHandle[] dstElems = dst.elements;
343                 int newDstSize = dstSize;
344                 for (int i = srcStart; i < srcEnd; i++) {
345                     DefaultHandle element = srcElems[i];
346                     if (element.recycleId == 0) {
347                         element.recycleId = element.lastRecycledId;
348                     } else if (element.recycleId != element.lastRecycledId) {
349                         throw new IllegalStateException("recycled already");
350                     }
351                     srcElems[i] = null;
352 
353                     if (dst.dropHandle(element)) {
354                         // Drop the object.
355                         continue;
356                     }
357                     element.stack = dst;
358                     dstElems[newDstSize ++] = element;
359                 }
360 
361                 if (srcEnd == LINK_CAPACITY && head.next != null) {
362                     // Add capacity back as the Link is GCed.
363                     reclaimSpace(LINK_CAPACITY);
364 
365                     this.head = head.next;
366                 }
367 
368                 head.readIndex = srcEnd;
369                 if (dst.size == newDstSize) {
370                     return false;
371                 }
372                 dst.size = newDstSize;
373                 return true;
374             } else {
375                 // The destination stack is full already.
376                 return false;
377             }
378         }
379 
380         @Override
381         protected void finalize() throws Throwable {
382             try {
383                 super.finalize();
384             } finally {
385                 // We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in
386                 // the stack. This is needed as we not have a good life-time control over the queue as it is used in a
387                 // WeakHashMap which will drop it at any time.
388                 Link link = head;
389                 while (link != null) {
390                     reclaimSpace(LINK_CAPACITY);
391                     link = link.next;
392                 }
393             }
394         }
395     }
396 
397     static final class Stack<T> {
398 
399         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
400         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
401         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
402         // still recycling all items.
403         final Recycler<T> parent;
404         final Thread thread;
405         final AtomicInteger availableSharedCapacity;
406         final int maxDelayedQueues;
407         private final int maxCapacity;
408         private final int ratioMask;
409         private DefaultHandle[] elements;
410         private int size;
411         private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
412         private WeakOrderQueue cursor, prev;
413         private volatile WeakOrderQueue head;
414 
415         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
416               int ratioMask, int maxDelayedQueues) {
417             this.parent = parent;
418             this.thread = thread;
419             this.maxCapacity = maxCapacity;
420             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
421             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
422             this.ratioMask = ratioMask;
423             this.maxDelayedQueues = maxDelayedQueues;
424         }
425 
426         // Marked as synchronized to ensure this is serialized.
427         synchronized void setHead(WeakOrderQueue queue) {
428             queue.setNext(head);
429             head = queue;
430         }
431 
432         int increaseCapacity(int expectedCapacity) {
433             int newCapacity = elements.length;
434             int maxCapacity = this.maxCapacity;
435             do {
436                 newCapacity <<= 1;
437             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
438 
439             newCapacity = min(newCapacity, maxCapacity);
440             if (newCapacity != elements.length) {
441                 elements = Arrays.copyOf(elements, newCapacity);
442             }
443 
444             return newCapacity;
445         }
446 
447         DefaultHandle pop() {
448             int size = this.size;
449             if (size == 0) {
450                 if (!scavenge()) {
451                     return null;
452                 }
453                 size = this.size;
454             }
455             size --;
456             DefaultHandle ret = elements[size];
457             elements[size] = null;
458             if (ret.lastRecycledId != ret.recycleId) {
459                 throw new IllegalStateException("recycled multiple times");
460             }
461             ret.recycleId = 0;
462             ret.lastRecycledId = 0;
463             this.size = size;
464             return ret;
465         }
466 
467         boolean scavenge() {
468             // continue an existing scavenge, if any
469             if (scavengeSome()) {
470                 return true;
471             }
472 
473             // reset our scavenge cursor
474             prev = null;
475             cursor = head;
476             return false;
477         }
478 
479         boolean scavengeSome() {
480             WeakOrderQueue prev;
481             WeakOrderQueue cursor = this.cursor;
482             if (cursor == null) {
483                 prev = null;
484                 cursor = head;
485                 if (cursor == null) {
486                     return false;
487                 }
488             } else {
489                 prev = this.prev;
490             }
491 
492             boolean success = false;
493             do {
494                 if (cursor.transfer(this)) {
495                     success = true;
496                     break;
497                 }
498                 WeakOrderQueue next = cursor.next;
499                 if (cursor.owner.get() == null) {
500                     // If the thread associated with the queue is gone, unlink it, after
501                     // performing a volatile read to confirm there is no data left to collect.
502                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
503                     if (cursor.hasFinalData()) {
504                         for (;;) {
505                             if (cursor.transfer(this)) {
506                                 success = true;
507                             } else {
508                                 break;
509                             }
510                         }
511                     }
512 
513                     if (prev != null) {
514                         prev.setNext(next);
515                     }
516                 } else {
517                     prev = cursor;
518                 }
519 
520                 cursor = next;
521 
522             } while (cursor != null && !success);
523 
524             this.prev = prev;
525             this.cursor = cursor;
526             return success;
527         }
528 
529         void push(DefaultHandle item) {
530             Thread currentThread = Thread.currentThread();
531             if (thread == currentThread) {
532                 // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
533                 pushNow(item);
534             } else {
535                 // The current Thread is not the one that belongs to the Stack, we need to signal that the push
536                 // happens later.
537                 pushLater(item, currentThread);
538             }
539         }
540 
541         private void pushNow(DefaultHandle item) {
542             if ((item.recycleId | item.lastRecycledId) != 0) {
543                 throw new IllegalStateException("recycled already");
544             }
545             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
546 
547             int size = this.size;
548             if (size >= maxCapacity || dropHandle(item)) {
549                 // Hit the maximum capacity or should drop - drop the possibly youngest object.
550                 return;
551             }
552             if (size == elements.length) {
553                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
554             }
555 
556             elements[size] = item;
557             this.size = size + 1;
558         }
559 
560         private void pushLater(DefaultHandle item, Thread thread) {
561             // we don't want to have a ref to the queue as the value in our weak map
562             // so we null it out; to ensure there are no races with restoring it later
563             // we impose a memory ordering here (no-op on x86)
564             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
565             WeakOrderQueue queue = delayedRecycled.get(this);
566             if (queue == null) {
567                 if (delayedRecycled.size() >= maxDelayedQueues) {
568                     // Add a dummy queue so we know we should drop the object
569                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
570                     return;
571                 }
572                 // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
573                 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
574                     // drop object
575                     return;
576                 }
577                 delayedRecycled.put(this, queue);
578             } else if (queue == WeakOrderQueue.DUMMY) {
579                 // drop object
580                 return;
581             }
582 
583             queue.add(item);
584         }
585 
586         boolean dropHandle(DefaultHandle handle) {
587             if (!handle.hasBeenRecycled) {
588                 if ((++handleRecycleCount & ratioMask) != 0) {
589                     // Drop the object.
590                     return true;
591                 }
592                 handle.hasBeenRecycled = true;
593             }
594             return false;
595         }
596 
597         DefaultHandle newHandle() {
598             return new DefaultHandle(this);
599         }
600     }
601 }