View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.ObjectCleaner;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  import java.lang.ref.WeakReference;
26  import java.util.Arrays;
27  import java.util.Map;
28  import java.util.WeakHashMap;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
32  import static java.lang.Math.max;
33  import static java.lang.Math.min;
34  
35  /**
36   * Light-weight object pool based on a thread-local stack.
37   *
38   * @param <T> the type of the pooled object
39   */
40  public abstract class Recycler<T> {
41  
42      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
43  
44      @SuppressWarnings("rawtypes")
45      private static final Handle NOOP_HANDLE = new Handle() {
46          @Override
47          public void recycle(Object object) {
48              // NOOP
49          }
50      };
51      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
52      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
53      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
54      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
55      private static final int INITIAL_CAPACITY;
56      private static final int MAX_SHARED_CAPACITY_FACTOR;
57      private static final int MAX_DELAYED_QUEUES_PER_THREAD;
58      private static final int LINK_CAPACITY;
59      private static final int RATIO;
60  
61      static {
62          // In the future, we might have different maxCapacity for different object types.
63          // e.g. io.netty.recycler.maxCapacity.writeTask
64          //      io.netty.recycler.maxCapacity.outboundBuffer
65          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
66                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
67          if (maxCapacityPerThread < 0) {
68              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
69          }
70  
71          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
72  
73          MAX_SHARED_CAPACITY_FACTOR = max(2,
74                  SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
75                          2));
76  
77          MAX_DELAYED_QUEUES_PER_THREAD = max(0,
78                  SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
79                          // We use the same value as default EventLoop number
80                          NettyRuntime.availableProcessors() * 2));
81  
82          LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
83                  max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
84  
85          // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
86          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
87          // bursts.
88          RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
89  
90          if (logger.isDebugEnabled()) {
91              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
92                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
93                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
94                  logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
95                  logger.debug("-Dio.netty.recycler.ratio: disabled");
96              } else {
97                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
98                  logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
99                  logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
100                 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
101             }
102         }
103 
104         INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
105     }
106 
107     private final int maxCapacityPerThread;
108     private final int maxSharedCapacityFactor;
109     private final int ratioMask;
110     private final int maxDelayedQueuesPerThread;
111 
112     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
113         @Override
114         protected Stack<T> initialValue() {
115             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
116                     ratioMask, maxDelayedQueuesPerThread);
117         }
118 
119         @Override
120         protected void onRemoval(Stack<T> value) {
121             // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
122             if (value.threadRef.get() == Thread.currentThread()) {
123                if (DELAYED_RECYCLED.isSet()) {
124                    DELAYED_RECYCLED.get().remove(value);
125                }
126             }
127         }
128     };
129 
130     protected Recycler() {
131         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
132     }
133 
134     protected Recycler(int maxCapacityPerThread) {
135         this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
136     }
137 
138     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139         this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
140     }
141 
142     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
143                        int ratio, int maxDelayedQueuesPerThread) {
144         ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
145         if (maxCapacityPerThread <= 0) {
146             this.maxCapacityPerThread = 0;
147             this.maxSharedCapacityFactor = 1;
148             this.maxDelayedQueuesPerThread = 0;
149         } else {
150             this.maxCapacityPerThread = maxCapacityPerThread;
151             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
152             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
153         }
154     }
155 
156     @SuppressWarnings("unchecked")
157     public final T get() {
158         if (maxCapacityPerThread == 0) {
159             return newObject((Handle<T>) NOOP_HANDLE);
160         }
161         Stack<T> stack = threadLocal.get();
162         DefaultHandle<T> handle = stack.pop();
163         if (handle == null) {
164             handle = stack.newHandle();
165             handle.value = newObject(handle);
166         }
167         return (T) handle.value;
168     }
169 
170     /**
171      * @deprecated use {@link Handle#recycle(Object)}.
172      */
173     @Deprecated
174     public final boolean recycle(T o, Handle<T> handle) {
175         if (handle == NOOP_HANDLE) {
176             return false;
177         }
178 
179         DefaultHandle<T> h = (DefaultHandle<T>) handle;
180         if (h.stack.parent != this) {
181             return false;
182         }
183 
184         h.recycle(o);
185         return true;
186     }
187 
188     final int threadLocalCapacity() {
189         return threadLocal.get().elements.length;
190     }
191 
192     final int threadLocalSize() {
193         return threadLocal.get().size;
194     }
195 
196     protected abstract T newObject(Handle<T> handle);
197 
198     public interface Handle<T> {
199         void recycle(T object);
200     }
201 
202     static final class DefaultHandle<T> implements Handle<T> {
203         private int lastRecycledId;
204         private int recycleId;
205 
206         boolean hasBeenRecycled;
207 
208         private Stack<?> stack;
209         private Object value;
210 
211         DefaultHandle(Stack<?> stack) {
212             this.stack = stack;
213         }
214 
215         @Override
216         public void recycle(Object object) {
217             if (object != value) {
218                 throw new IllegalArgumentException("object does not belong to handle");
219             }
220             stack.push(this);
221         }
222     }
223 
224     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
225             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
226         @Override
227         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
228             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
229         }
230     };
231 
232     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
233     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
234     private static final class WeakOrderQueue {
235 
236         static final WeakOrderQueue DUMMY = new WeakOrderQueue();
237 
238         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
239         @SuppressWarnings("serial")
240         static final class Link extends AtomicInteger {
241             private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
242 
243             private int readIndex;
244             Link next;
245         }
246 
247         // This act as a place holder for the head Link but also will be used by the ObjectCleaner
248         // to return space that was before reserved. Its important this does not hold any reference to
249         // either Stack or WeakOrderQueue.
250         static final class Head implements Runnable {
251             private final AtomicInteger availableSharedCapacity;
252 
253             Link link;
254 
255             Head(AtomicInteger availableSharedCapacity) {
256                 this.availableSharedCapacity = availableSharedCapacity;
257             }
258 
259             @Override
260             public void run() {
261                 Link head = link;
262                 while (head != null) {
263                     reclaimSpace(LINK_CAPACITY);
264                     head = head.next;
265                 }
266             }
267 
268             void reclaimSpace(int space) {
269                 assert space >= 0;
270                 availableSharedCapacity.addAndGet(space);
271             }
272 
273             boolean reserveSpace(int space) {
274                 return reserveSpace(availableSharedCapacity, space);
275             }
276 
277             static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
278                 assert space >= 0;
279                 for (;;) {
280                     int available = availableSharedCapacity.get();
281                     if (available < space) {
282                         return false;
283                     }
284                     if (availableSharedCapacity.compareAndSet(available, available - space)) {
285                         return true;
286                     }
287                 }
288             }
289         }
290 
291         // chain of data items
292         private final Head head;
293         private Link tail;
294         // pointer to another queue of delayed items for the same stack
295         private WeakOrderQueue next;
296         private final WeakReference<Thread> owner;
297         private final int id = ID_GENERATOR.getAndIncrement();
298 
299         private WeakOrderQueue() {
300             owner = null;
301             head = new Head(null);
302         }
303 
304         private WeakOrderQueue(Stack<?> stack, Thread thread) {
305             tail = new Link();
306 
307             // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
308             // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
309             // Stack itself GCed.
310             head = new Head(stack.availableSharedCapacity);
311             head.link = tail;
312             owner = new WeakReference<Thread>(thread);
313         }
314 
315         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
316             final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
317             // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
318             // may be accessed while its still constructed.
319             stack.setHead(queue);
320 
321             // We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in
322             // the stack. This is needed as we not have a good life-time control over the queue as it is used in a
323             // WeakHashMap which will drop it at any time.
324             final Head head = queue.head;
325             ObjectCleaner.register(queue, head);
326 
327             return queue;
328         }
329 
330         private void setNext(WeakOrderQueue next) {
331             assert next != this;
332             this.next = next;
333         }
334 
335         /**
336          * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
337          */
338         static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
339             // We allocated a Link so reserve the space
340             return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
341                     ? newQueue(stack, thread) : null;
342         }
343 
344         void add(DefaultHandle<?> handle) {
345             handle.lastRecycledId = id;
346 
347             Link tail = this.tail;
348             int writeIndex;
349             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
350                 if (!head.reserveSpace(LINK_CAPACITY)) {
351                     // Drop it.
352                     return;
353                 }
354                 // We allocate a Link so reserve the space
355                 this.tail = tail = tail.next = new Link();
356 
357                 writeIndex = tail.get();
358             }
359             tail.elements[writeIndex] = handle;
360             handle.stack = null;
361             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
362             // this also means we guarantee visibility of an element in the queue if we see the index updated
363             tail.lazySet(writeIndex + 1);
364         }
365 
366         boolean hasFinalData() {
367             return tail.readIndex != tail.get();
368         }
369 
370         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
371         @SuppressWarnings("rawtypes")
372         boolean transfer(Stack<?> dst) {
373             Link head = this.head.link;
374             if (head == null) {
375                 return false;
376             }
377 
378             if (head.readIndex == LINK_CAPACITY) {
379                 if (head.next == null) {
380                     return false;
381                 }
382                 this.head.link = head = head.next;
383             }
384 
385             final int srcStart = head.readIndex;
386             int srcEnd = head.get();
387             final int srcSize = srcEnd - srcStart;
388             if (srcSize == 0) {
389                 return false;
390             }
391 
392             final int dstSize = dst.size;
393             final int expectedCapacity = dstSize + srcSize;
394 
395             if (expectedCapacity > dst.elements.length) {
396                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
397                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
398             }
399 
400             if (srcStart != srcEnd) {
401                 final DefaultHandle[] srcElems = head.elements;
402                 final DefaultHandle[] dstElems = dst.elements;
403                 int newDstSize = dstSize;
404                 for (int i = srcStart; i < srcEnd; i++) {
405                     DefaultHandle element = srcElems[i];
406                     if (element.recycleId == 0) {
407                         element.recycleId = element.lastRecycledId;
408                     } else if (element.recycleId != element.lastRecycledId) {
409                         throw new IllegalStateException("recycled already");
410                     }
411                     srcElems[i] = null;
412 
413                     if (dst.dropHandle(element)) {
414                         // Drop the object.
415                         continue;
416                     }
417                     element.stack = dst;
418                     dstElems[newDstSize ++] = element;
419                 }
420 
421                 if (srcEnd == LINK_CAPACITY && head.next != null) {
422                     // Add capacity back as the Link is GCed.
423                     this.head.reclaimSpace(LINK_CAPACITY);
424                     this.head.link = head.next;
425                 }
426 
427                 head.readIndex = srcEnd;
428                 if (dst.size == newDstSize) {
429                     return false;
430                 }
431                 dst.size = newDstSize;
432                 return true;
433             } else {
434                 // The destination stack is full already.
435                 return false;
436             }
437         }
438     }
439 
440     static final class Stack<T> {
441 
442         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
443         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
444         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
445         // still recycling all items.
446         final Recycler<T> parent;
447 
448         // We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
449         // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack.
450         //
451         // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if
452         // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear
453         // it in a timely manner).
454         final WeakReference<Thread> threadRef;
455         final AtomicInteger availableSharedCapacity;
456         final int maxDelayedQueues;
457 
458         private final int maxCapacity;
459         private final int ratioMask;
460         private DefaultHandle<?>[] elements;
461         private int size;
462         private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
463         private WeakOrderQueue cursor, prev;
464         private volatile WeakOrderQueue head;
465 
466         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
467               int ratioMask, int maxDelayedQueues) {
468             this.parent = parent;
469             threadRef = new WeakReference<Thread>(thread);
470             this.maxCapacity = maxCapacity;
471             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
472             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
473             this.ratioMask = ratioMask;
474             this.maxDelayedQueues = maxDelayedQueues;
475         }
476 
477         // Marked as synchronized to ensure this is serialized.
478         synchronized void setHead(WeakOrderQueue queue) {
479             queue.setNext(head);
480             head = queue;
481         }
482 
483         int increaseCapacity(int expectedCapacity) {
484             int newCapacity = elements.length;
485             int maxCapacity = this.maxCapacity;
486             do {
487                 newCapacity <<= 1;
488             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
489 
490             newCapacity = min(newCapacity, maxCapacity);
491             if (newCapacity != elements.length) {
492                 elements = Arrays.copyOf(elements, newCapacity);
493             }
494 
495             return newCapacity;
496         }
497 
498         @SuppressWarnings({ "unchecked", "rawtypes" })
499         DefaultHandle<T> pop() {
500             int size = this.size;
501             if (size == 0) {
502                 if (!scavenge()) {
503                     return null;
504                 }
505                 size = this.size;
506             }
507             size --;
508             DefaultHandle ret = elements[size];
509             elements[size] = null;
510             if (ret.lastRecycledId != ret.recycleId) {
511                 throw new IllegalStateException("recycled multiple times");
512             }
513             ret.recycleId = 0;
514             ret.lastRecycledId = 0;
515             this.size = size;
516             return ret;
517         }
518 
519         boolean scavenge() {
520             // continue an existing scavenge, if any
521             if (scavengeSome()) {
522                 return true;
523             }
524 
525             // reset our scavenge cursor
526             prev = null;
527             cursor = head;
528             return false;
529         }
530 
531         boolean scavengeSome() {
532             WeakOrderQueue prev;
533             WeakOrderQueue cursor = this.cursor;
534             if (cursor == null) {
535                 prev = null;
536                 cursor = head;
537                 if (cursor == null) {
538                     return false;
539                 }
540             } else {
541                 prev = this.prev;
542             }
543 
544             boolean success = false;
545             do {
546                 if (cursor.transfer(this)) {
547                     success = true;
548                     break;
549                 }
550                 WeakOrderQueue next = cursor.next;
551                 if (cursor.owner.get() == null) {
552                     // If the thread associated with the queue is gone, unlink it, after
553                     // performing a volatile read to confirm there is no data left to collect.
554                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
555                     if (cursor.hasFinalData()) {
556                         for (;;) {
557                             if (cursor.transfer(this)) {
558                                 success = true;
559                             } else {
560                                 break;
561                             }
562                         }
563                     }
564 
565                     if (prev != null) {
566                         prev.setNext(next);
567                     }
568                 } else {
569                     prev = cursor;
570                 }
571 
572                 cursor = next;
573 
574             } while (cursor != null && !success);
575 
576             this.prev = prev;
577             this.cursor = cursor;
578             return success;
579         }
580 
581         void push(DefaultHandle<?> item) {
582             Thread currentThread = Thread.currentThread();
583             if (threadRef.get() == currentThread) {
584                 // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
585                 pushNow(item);
586             } else {
587                 // The current Thread is not the one that belongs to the Stack
588                 // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
589                 // happens later.
590                 pushLater(item, currentThread);
591             }
592         }
593 
594         private void pushNow(DefaultHandle<?> item) {
595             if ((item.recycleId | item.lastRecycledId) != 0) {
596                 throw new IllegalStateException("recycled already");
597             }
598             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
599 
600             int size = this.size;
601             if (size >= maxCapacity || dropHandle(item)) {
602                 // Hit the maximum capacity or should drop - drop the possibly youngest object.
603                 return;
604             }
605             if (size == elements.length) {
606                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
607             }
608 
609             elements[size] = item;
610             this.size = size + 1;
611         }
612 
613         private void pushLater(DefaultHandle<?> item, Thread thread) {
614             // we don't want to have a ref to the queue as the value in our weak map
615             // so we null it out; to ensure there are no races with restoring it later
616             // we impose a memory ordering here (no-op on x86)
617             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
618             WeakOrderQueue queue = delayedRecycled.get(this);
619             if (queue == null) {
620                 if (delayedRecycled.size() >= maxDelayedQueues) {
621                     // Add a dummy queue so we know we should drop the object
622                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
623                     return;
624                 }
625                 // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
626                 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
627                     // drop object
628                     return;
629                 }
630                 delayedRecycled.put(this, queue);
631             } else if (queue == WeakOrderQueue.DUMMY) {
632                 // drop object
633                 return;
634             }
635 
636             queue.add(item);
637         }
638 
639         boolean dropHandle(DefaultHandle<?> handle) {
640             if (!handle.hasBeenRecycled) {
641                 if ((++handleRecycleCount & ratioMask) != 0) {
642                     // Drop the object.
643                     return true;
644                 }
645                 handle.hasBeenRecycled = true;
646             }
647             return false;
648         }
649 
650         DefaultHandle<T> newHandle() {
651             return new DefaultHandle<T>(this);
652         }
653     }
654 }