View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.lang.ref.WeakReference;
25  import java.util.Arrays;
26  import java.util.Map;
27  import java.util.WeakHashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  /**
31   * Light-weight object pool based on a thread-local stack.
32   *
33   * @param <T> the type of the pooled object
34   */
35  public abstract class Recycler<T> {
36  
37      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
38  
39      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
40      private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
41      private static final int DEFAULT_MAX_CAPACITY;
42      private static final int INITIAL_CAPACITY;
43  
44      static {
45          // In the future, we might have different maxCapacity for different object types.
46          // e.g. io.netty.recycler.maxCapacity.writeTask
47          //      io.netty.recycler.maxCapacity.outboundBuffer
48          int maxCapacity = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", 0);
49          if (maxCapacity <= 0) {
50              // TODO: Some arbitrary large number - should adjust as we get more production experience.
51              maxCapacity = 262144;
52          }
53  
54          DEFAULT_MAX_CAPACITY = maxCapacity;
55          if (logger.isDebugEnabled()) {
56              logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY);
57          }
58  
59          INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY, 256);
60      }
61  
62      private final int maxCapacity;
63      private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
64          @Override
65          protected Stack<T> initialValue() {
66              return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
67          }
68      };
69  
70      protected Recycler() {
71          this(DEFAULT_MAX_CAPACITY);
72      }
73  
74      protected Recycler(int maxCapacity) {
75          this.maxCapacity = Math.max(0, maxCapacity);
76      }
77  
78      @SuppressWarnings("unchecked")
79      public final T get() {
80          Stack<T> stack = threadLocal.get();
81          DefaultHandle<T> handle = stack.pop();
82          if (handle == null) {
83              handle = stack.newHandle();
84              handle.value = newObject(handle);
85          }
86          return (T) handle.value;
87      }
88  
89      public final boolean recycle(T o, Handle<T> handle) {
90          DefaultHandle<T> h = (DefaultHandle<T>) handle;
91          if (h.stack.parent != this) {
92              return false;
93          }
94  
95          h.recycle(o);
96          return true;
97      }
98  
99      final int threadLocalCapacity() {
100         return threadLocal.get().elements.length;
101     }
102 
103     final int threadLocalSize() {
104         return threadLocal.get().size;
105     }
106 
107     protected abstract T newObject(Handle<T> handle);
108 
109     public interface Handle<T> {
110         void recycle(T object);
111     }
112 
113     static final class DefaultHandle<T> implements Handle<T> {
114         private int lastRecycledId;
115         private int recycleId;
116 
117         private Stack<?> stack;
118         private Object value;
119 
120         DefaultHandle(Stack<?> stack) {
121             this.stack = stack;
122         }
123 
124         @Override
125         public void recycle(Object object) {
126             if (object != value) {
127                 throw new IllegalArgumentException("object does not belong to handle");
128             }
129             Thread thread = Thread.currentThread();
130             if (thread == stack.thread) {
131                 stack.push(this);
132                 return;
133             }
134             // we don't want to have a ref to the queue as the value in our weak map
135             // so we null it out; to ensure there are no races with restoring it later
136             // we impose a memory ordering here (no-op on x86)
137             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
138             WeakOrderQueue queue = delayedRecycled.get(stack);
139             if (queue == null) {
140                 delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
141             }
142             queue.add(this);
143         }
144     }
145 
146     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
147             new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
148         @Override
149         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
150             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
151         }
152     };
153 
154     // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
155     // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
156     private static final class WeakOrderQueue {
157         private static final int LINK_CAPACITY = 16;
158 
159         // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
160         @SuppressWarnings("serial")
161         private static final class Link extends AtomicInteger {
162             private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
163 
164             private int readIndex;
165             private Link next;
166         }
167 
168         // chain of data items
169         private Link head, tail;
170         // pointer to another queue of delayed items for the same stack
171         private WeakOrderQueue next;
172         private final WeakReference<Thread> owner;
173         private final int id = ID_GENERATOR.getAndIncrement();
174 
175         WeakOrderQueue(Stack<?> stack, Thread thread) {
176             head = tail = new Link();
177             owner = new WeakReference<Thread>(thread);
178             synchronized (stack) {
179                 next = stack.head;
180                 stack.head = this;
181             }
182         }
183 
184         void add(DefaultHandle<?> handle) {
185             handle.lastRecycledId = id;
186 
187             Link tail = this.tail;
188             int writeIndex;
189             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
190                 this.tail = tail = tail.next = new Link();
191                 writeIndex = tail.get();
192             }
193             tail.elements[writeIndex] = handle;
194             handle.stack = null;
195             // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
196             // this also means we guarantee visibility of an element in the queue if we see the index updated
197             tail.lazySet(writeIndex + 1);
198         }
199 
200         boolean hasFinalData() {
201             return tail.readIndex != tail.get();
202         }
203 
204         // transfer as many items as we can from this queue to the stack, returning true if any were transferred
205         @SuppressWarnings("rawtypes")
206         boolean transfer(Stack<?> dst) {
207 
208             Link head = this.head;
209             if (head == null) {
210                 return false;
211             }
212 
213             if (head.readIndex == LINK_CAPACITY) {
214                 if (head.next == null) {
215                     return false;
216                 }
217                 this.head = head = head.next;
218             }
219 
220             final int srcStart = head.readIndex;
221             int srcEnd = head.get();
222             final int srcSize = srcEnd - srcStart;
223             if (srcSize == 0) {
224                 return false;
225             }
226 
227             final int dstSize = dst.size;
228             final int expectedCapacity = dstSize + srcSize;
229 
230             if (expectedCapacity > dst.elements.length) {
231                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
232                 srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
233             }
234 
235             if (srcStart != srcEnd) {
236                 final DefaultHandle[] srcElems = head.elements;
237                 final DefaultHandle[] dstElems = dst.elements;
238                 int newDstSize = dstSize;
239                 for (int i = srcStart; i < srcEnd; i++) {
240                     DefaultHandle element = srcElems[i];
241                     if (element.recycleId == 0) {
242                         element.recycleId = element.lastRecycledId;
243                     } else if (element.recycleId != element.lastRecycledId) {
244                         throw new IllegalStateException("recycled already");
245                     }
246                     element.stack = dst;
247                     dstElems[newDstSize ++] = element;
248                     srcElems[i] = null;
249                 }
250                 dst.size = newDstSize;
251 
252                 if (srcEnd == LINK_CAPACITY && head.next != null) {
253                     this.head = head.next;
254                 }
255 
256                 head.readIndex = srcEnd;
257                 return true;
258             } else {
259                 // The destination stack is full already.
260                 return false;
261             }
262         }
263     }
264 
265     static final class Stack<T> {
266 
267         // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
268         // than the stack owner recycles: when we run out of items in our stack we iterate this collection
269         // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
270         // still recycling all items.
271         final Recycler<T> parent;
272         final Thread thread;
273         private DefaultHandle<?>[] elements;
274         private final int maxCapacity;
275         private int size;
276 
277         private volatile WeakOrderQueue head;
278         private WeakOrderQueue cursor, prev;
279 
280         Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
281             this.parent = parent;
282             this.thread = thread;
283             this.maxCapacity = maxCapacity;
284             elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
285         }
286 
287         int increaseCapacity(int expectedCapacity) {
288             int newCapacity = elements.length;
289             int maxCapacity = this.maxCapacity;
290             do {
291                 newCapacity <<= 1;
292             } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
293 
294             newCapacity = Math.min(newCapacity, maxCapacity);
295             if (newCapacity != elements.length) {
296                 elements = Arrays.copyOf(elements, newCapacity);
297             }
298 
299             return newCapacity;
300         }
301 
302         @SuppressWarnings({ "unchecked", "rawtypes" })
303         DefaultHandle<T> pop() {
304             int size = this.size;
305             if (size == 0) {
306                 if (!scavenge()) {
307                     return null;
308                 }
309                 size = this.size;
310             }
311             size --;
312             DefaultHandle ret = elements[size];
313             if (ret.lastRecycledId != ret.recycleId) {
314                 throw new IllegalStateException("recycled multiple times");
315             }
316             ret.recycleId = 0;
317             ret.lastRecycledId = 0;
318             this.size = size;
319             return ret;
320         }
321 
322         boolean scavenge() {
323             // continue an existing scavenge, if any
324             if (scavengeSome()) {
325                 return true;
326             }
327 
328             // reset our scavenge cursor
329             prev = null;
330             cursor = head;
331             return false;
332         }
333 
334         boolean scavengeSome() {
335             WeakOrderQueue cursor = this.cursor;
336             if (cursor == null) {
337                 cursor = head;
338                 if (cursor == null) {
339                     return false;
340                 }
341             }
342 
343             boolean success = false;
344             WeakOrderQueue prev = this.prev;
345             do {
346                 if (cursor.transfer(this)) {
347                     success = true;
348                     break;
349                 }
350 
351                 WeakOrderQueue next = cursor.next;
352                 if (cursor.owner.get() == null) {
353                     // If the thread associated with the queue is gone, unlink it, after
354                     // performing a volatile read to confirm there is no data left to collect.
355                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
356                     if (cursor.hasFinalData()) {
357                         for (;;) {
358                             if (cursor.transfer(this)) {
359                                 success = true;
360                             } else {
361                                 break;
362                             }
363                         }
364                     }
365                     if (prev != null) {
366                         prev.next = next;
367                     }
368                 } else {
369                     prev = cursor;
370                 }
371 
372                 cursor = next;
373 
374             } while (cursor != null && !success);
375 
376             this.prev = prev;
377             this.cursor = cursor;
378             return success;
379         }
380 
381         void push(DefaultHandle<?> item) {
382             if ((item.recycleId | item.lastRecycledId) != 0) {
383                 throw new IllegalStateException("recycled already");
384             }
385             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
386 
387             int size = this.size;
388             if (size >= maxCapacity) {
389                 // Hit the maximum capacity - drop the possibly youngest object.
390                 return;
391             }
392             if (size == elements.length) {
393                 elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
394             }
395 
396             elements[size] = item;
397             this.size = size + 1;
398         }
399 
400         DefaultHandle<T> newHandle() {
401             return new DefaultHandle<T>(this);
402         }
403     }
404 }