1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.util;
18
19 import io.netty.util.concurrent.FastThreadLocal;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import java.lang.ref.WeakReference;
25 import java.util.Arrays;
26 import java.util.Map;
27 import java.util.WeakHashMap;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
31 import static java.lang.Math.max;
32 import static java.lang.Math.min;
33
34
35
36
37
38
39 public abstract class Recycler<T> {
40
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
42
43 private static final Handle NOOP_HANDLE = new Handle() { };
44 private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
45 private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
46 private static final int DEFAULT_INITIAL_MAX_CAPACITY = 32768;
47
48 private static final int DEFAULT_MAX_CAPACITY;
49 private static final int INITIAL_CAPACITY;
50 private static final int MAX_SHARED_CAPACITY_FACTOR;
51 private static final int MAX_DELAYED_QUEUES_PER_THREAD;
52 private static final int LINK_CAPACITY;
53 private static final int RATIO;
54
55 static {
56
57
58
59 int maxCapacity = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity.default",
60 DEFAULT_INITIAL_MAX_CAPACITY);
61 if (maxCapacity < 0) {
62 maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY;
63 }
64 DEFAULT_MAX_CAPACITY = maxCapacity;
65
66 MAX_SHARED_CAPACITY_FACTOR = max(2,
67 SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
68 2));
69
70 MAX_DELAYED_QUEUES_PER_THREAD = max(0,
71 SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
72
73 NettyRuntime.availableProcessors() * 2));
74
75 LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
76 max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
77
78
79
80
81 RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
82
83 if (logger.isDebugEnabled()) {
84 if (DEFAULT_MAX_CAPACITY == 0) {
85 logger.debug("-Dio.netty.recycler.maxCapacity.default: disabled");
86 logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
87 logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
88 logger.debug("-Dio.netty.recycler.ratio: disabled");
89 } else {
90 logger.debug("-Dio.netty.recycler.maxCapacity.default: {}", DEFAULT_MAX_CAPACITY);
91 logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
92 logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
93 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
94 }
95 }
96
97 INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256);
98 }
99
100 private final int maxCapacity;
101 private final int maxSharedCapacityFactor;
102 private final int ratioMask;
103 private final int maxDelayedQueuesPerThread;
104
105 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
106 @Override
107 protected Stack<T> initialValue() {
108 return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor,
109 ratioMask, maxDelayedQueuesPerThread);
110 }
111
112 @Override
113 protected void onRemoval(Stack<T> value) {
114
115 if (value.threadRef.get() == Thread.currentThread()) {
116 if (DELAYED_RECYCLED.isSet()) {
117 DELAYED_RECYCLED.get().remove(value);
118 }
119 }
120 }
121 };
122
123 protected Recycler() {
124 this(DEFAULT_MAX_CAPACITY);
125 }
126
127 protected Recycler(int maxCapacity) {
128 this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR);
129 }
130
131 protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
132 this(maxCapacity, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
133 }
134
135 protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
136 ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
137 if (maxCapacity <= 0) {
138 this.maxCapacity = 0;
139 this.maxSharedCapacityFactor = 1;
140 this.maxDelayedQueuesPerThread = 0;
141 } else {
142 this.maxCapacity = maxCapacity;
143 this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
144 this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
145 }
146 }
147
148 @SuppressWarnings("unchecked")
149 public final T get() {
150 if (maxCapacity == 0) {
151 return newObject(NOOP_HANDLE);
152 }
153 Stack<T> stack = threadLocal.get();
154 DefaultHandle handle = stack.pop();
155 if (handle == null) {
156 handle = stack.newHandle();
157 handle.value = newObject(handle);
158 }
159 return (T) handle.value;
160 }
161
162 public final boolean recycle(T o, Handle handle) {
163 if (handle == NOOP_HANDLE) {
164 return false;
165 }
166
167 DefaultHandle h = (DefaultHandle) handle;
168 if (h.stack.parent != this) {
169 return false;
170 }
171 if (o != h.value) {
172 throw new IllegalArgumentException("o does not belong to handle");
173 }
174 h.recycle();
175 return true;
176 }
177
178 protected abstract T newObject(Handle handle);
179
180 final int threadLocalCapacity() {
181 return threadLocal.get().elements.length;
182 }
183
184 final int threadLocalSize() {
185 return threadLocal.get().size;
186 }
187
188 public interface Handle { }
189
190 static final class DefaultHandle implements Handle {
191 private int lastRecycledId;
192 private int recycleId;
193
194 boolean hasBeenRecycled;
195
196 private Stack<?> stack;
197 private Object value;
198
199 DefaultHandle(Stack<?> stack) {
200 this.stack = stack;
201 }
202
203 public void recycle() {
204 stack.push(this);
205 }
206 }
207
208 private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
209 new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
210 @Override
211 protected Map<Stack<?>, WeakOrderQueue> initialValue() {
212 return new WeakHashMap<Stack<?>, WeakOrderQueue>();
213 }
214 };
215
216
217
218 private static final class WeakOrderQueue {
219
220 static final WeakOrderQueue DUMMY = new WeakOrderQueue();
221
222
223 @SuppressWarnings("serial")
224 private static final class Link extends AtomicInteger {
225 private final DefaultHandle[] elements = new DefaultHandle[LINK_CAPACITY];
226
227 private int readIndex;
228 private Link next;
229 }
230
231
232 private Link head, tail;
233
234 private WeakOrderQueue next;
235 private final WeakReference<Thread> owner;
236 private final int id = ID_GENERATOR.getAndIncrement();
237 private final AtomicInteger availableSharedCapacity;
238
239 private WeakOrderQueue() {
240 owner = null;
241 availableSharedCapacity = null;
242 }
243
244 private WeakOrderQueue(Stack<?> stack, Thread thread) {
245 head = tail = new Link();
246 owner = new WeakReference<Thread>(thread);
247
248
249
250
251 availableSharedCapacity = stack.availableSharedCapacity;
252 }
253
254 static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
255 WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
256
257
258 stack.setHead(queue);
259 return queue;
260 }
261
262 private void setNext(WeakOrderQueue next) {
263 assert next != this;
264 this.next = next;
265 }
266
267
268
269
270 static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
271
272 return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
273 ? newQueue(stack, thread) : null;
274 }
275
276 private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
277 assert space >= 0;
278 for (;;) {
279 int available = availableSharedCapacity.get();
280 if (available < space) {
281 return false;
282 }
283 if (availableSharedCapacity.compareAndSet(available, available - space)) {
284 return true;
285 }
286 }
287 }
288
289 private void reclaimSpace(int space) {
290 assert space >= 0;
291 availableSharedCapacity.addAndGet(space);
292 }
293
294 void add(DefaultHandle handle) {
295 handle.lastRecycledId = id;
296
297 Link tail = this.tail;
298 int writeIndex;
299 if ((writeIndex = tail.get()) == LINK_CAPACITY) {
300 if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
301
302 return;
303 }
304
305 this.tail = tail = tail.next = new Link();
306
307 writeIndex = tail.get();
308 }
309 tail.elements[writeIndex] = handle;
310 handle.stack = null;
311
312
313 tail.lazySet(writeIndex + 1);
314 }
315
316 boolean hasFinalData() {
317 return tail.readIndex != tail.get();
318 }
319
320
321 @SuppressWarnings("rawtypes")
322 boolean transfer(Stack<?> dst) {
323 Link head = this.head;
324 if (head == null) {
325 return false;
326 }
327
328 if (head.readIndex == LINK_CAPACITY) {
329 if (head.next == null) {
330 return false;
331 }
332 this.head = head = head.next;
333 }
334
335 final int srcStart = head.readIndex;
336 int srcEnd = head.get();
337 final int srcSize = srcEnd - srcStart;
338 if (srcSize == 0) {
339 return false;
340 }
341
342 final int dstSize = dst.size;
343 final int expectedCapacity = dstSize + srcSize;
344
345 if (expectedCapacity > dst.elements.length) {
346 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
347 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
348 }
349
350 if (srcStart != srcEnd) {
351 final DefaultHandle[] srcElems = head.elements;
352 final DefaultHandle[] dstElems = dst.elements;
353 int newDstSize = dstSize;
354 for (int i = srcStart; i < srcEnd; i++) {
355 DefaultHandle element = srcElems[i];
356 if (element.recycleId == 0) {
357 element.recycleId = element.lastRecycledId;
358 } else if (element.recycleId != element.lastRecycledId) {
359 throw new IllegalStateException("recycled already");
360 }
361 srcElems[i] = null;
362
363 if (dst.dropHandle(element)) {
364
365 continue;
366 }
367 element.stack = dst;
368 dstElems[newDstSize ++] = element;
369 }
370
371 if (srcEnd == LINK_CAPACITY && head.next != null) {
372
373 reclaimSpace(LINK_CAPACITY);
374
375 this.head = head.next;
376 }
377
378 head.readIndex = srcEnd;
379 if (dst.size == newDstSize) {
380 return false;
381 }
382 dst.size = newDstSize;
383 return true;
384 } else {
385
386 return false;
387 }
388 }
389
390 @Override
391 protected void finalize() throws Throwable {
392 try {
393 super.finalize();
394 } finally {
395
396
397
398 Link link = head;
399 while (link != null) {
400 reclaimSpace(LINK_CAPACITY);
401 link = link.next;
402 }
403 }
404 }
405 }
406
407 static final class Stack<T> {
408
409
410
411
412
413 final Recycler<T> parent;
414
415
416
417
418
419
420
421 final WeakReference<Thread> threadRef;
422 final AtomicInteger availableSharedCapacity;
423 final int maxDelayedQueues;
424 private final int maxCapacity;
425 private final int ratioMask;
426 private DefaultHandle[] elements;
427 private int size;
428 private int handleRecycleCount = -1;
429 private WeakOrderQueue cursor, prev;
430 private volatile WeakOrderQueue head;
431
432 Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
433 int ratioMask, int maxDelayedQueues) {
434 this.parent = parent;
435 threadRef = new WeakReference<Thread>(thread);
436 this.maxCapacity = maxCapacity;
437 availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
438 elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
439 this.ratioMask = ratioMask;
440 this.maxDelayedQueues = maxDelayedQueues;
441 }
442
443
444 synchronized void setHead(WeakOrderQueue queue) {
445 queue.setNext(head);
446 head = queue;
447 }
448
449 int increaseCapacity(int expectedCapacity) {
450 int newCapacity = elements.length;
451 int maxCapacity = this.maxCapacity;
452 do {
453 newCapacity <<= 1;
454 } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
455
456 newCapacity = min(newCapacity, maxCapacity);
457 if (newCapacity != elements.length) {
458 elements = Arrays.copyOf(elements, newCapacity);
459 }
460
461 return newCapacity;
462 }
463
464 DefaultHandle pop() {
465 int size = this.size;
466 if (size == 0) {
467 if (!scavenge()) {
468 return null;
469 }
470 size = this.size;
471 }
472 size --;
473 DefaultHandle ret = elements[size];
474 elements[size] = null;
475 if (ret.lastRecycledId != ret.recycleId) {
476 throw new IllegalStateException("recycled multiple times");
477 }
478 ret.recycleId = 0;
479 ret.lastRecycledId = 0;
480 this.size = size;
481 return ret;
482 }
483
484 boolean scavenge() {
485
486 if (scavengeSome()) {
487 return true;
488 }
489
490
491 prev = null;
492 cursor = head;
493 return false;
494 }
495
496 boolean scavengeSome() {
497 WeakOrderQueue prev;
498 WeakOrderQueue cursor = this.cursor;
499 if (cursor == null) {
500 prev = null;
501 cursor = head;
502 if (cursor == null) {
503 return false;
504 }
505 } else {
506 prev = this.prev;
507 }
508
509 boolean success = false;
510 do {
511 if (cursor.transfer(this)) {
512 success = true;
513 break;
514 }
515 WeakOrderQueue next = cursor.next;
516 if (cursor.owner.get() == null) {
517
518
519
520 if (cursor.hasFinalData()) {
521 for (;;) {
522 if (cursor.transfer(this)) {
523 success = true;
524 } else {
525 break;
526 }
527 }
528 }
529
530 if (prev != null) {
531 prev.setNext(next);
532 }
533 } else {
534 prev = cursor;
535 }
536
537 cursor = next;
538
539 } while (cursor != null && !success);
540
541 this.prev = prev;
542 this.cursor = cursor;
543 return success;
544 }
545
546 void push(DefaultHandle item) {
547 Thread currentThread = Thread.currentThread();
548 if (threadRef.get() == currentThread) {
549
550 pushNow(item);
551 } else {
552
553
554
555 pushLater(item, currentThread);
556 }
557 }
558
559 private void pushNow(DefaultHandle item) {
560 if ((item.recycleId | item.lastRecycledId) != 0) {
561 throw new IllegalStateException("recycled already");
562 }
563 item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
564
565 int size = this.size;
566 if (size >= maxCapacity || dropHandle(item)) {
567
568 return;
569 }
570 if (size == elements.length) {
571 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
572 }
573
574 elements[size] = item;
575 this.size = size + 1;
576 }
577
578 private void pushLater(DefaultHandle item, Thread thread) {
579
580
581
582 Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
583 WeakOrderQueue queue = delayedRecycled.get(this);
584 if (queue == null) {
585 if (delayedRecycled.size() >= maxDelayedQueues) {
586
587 delayedRecycled.put(this, WeakOrderQueue.DUMMY);
588 return;
589 }
590
591 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
592
593 return;
594 }
595 delayedRecycled.put(this, queue);
596 } else if (queue == WeakOrderQueue.DUMMY) {
597
598 return;
599 }
600
601 queue.add(item);
602 }
603
604 boolean dropHandle(DefaultHandle handle) {
605 if (!handle.hasBeenRecycled) {
606 if ((++handleRecycleCount & ratioMask) != 0) {
607
608 return true;
609 }
610 handle.hasBeenRecycled = true;
611 }
612 return false;
613 }
614
615 DefaultHandle newHandle() {
616 return new DefaultHandle(this);
617 }
618 }
619 }