View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  import io.netty.util.concurrent.FastThreadLocal;
20  import io.netty.util.internal.PlatformDependent;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  import java.nio.ByteBuffer;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  public class PooledByteBufAllocator extends AbstractByteBufAllocator {
29  
30      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
31      private static final int DEFAULT_NUM_HEAP_ARENA;
32      private static final int DEFAULT_NUM_DIRECT_ARENA;
33  
34      private static final int DEFAULT_PAGE_SIZE;
35      private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
36      private static final int DEFAULT_TINY_CACHE_SIZE;
37      private static final int DEFAULT_SMALL_CACHE_SIZE;
38      private static final int DEFAULT_NORMAL_CACHE_SIZE;
39      private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
40      private static final int DEFAULT_CACHE_TRIM_INTERVAL;
41  
42      private static final int MIN_PAGE_SIZE = 4096;
43      private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
44  
45      static {
46          int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
47          Throwable pageSizeFallbackCause = null;
48          try {
49              validateAndCalculatePageShifts(defaultPageSize);
50          } catch (Throwable t) {
51              pageSizeFallbackCause = t;
52              defaultPageSize = 8192;
53          }
54          DEFAULT_PAGE_SIZE = defaultPageSize;
55  
56          int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
57          Throwable maxOrderFallbackCause = null;
58          try {
59              validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
60          } catch (Throwable t) {
61              maxOrderFallbackCause = t;
62              defaultMaxOrder = 11;
63          }
64          DEFAULT_MAX_ORDER = defaultMaxOrder;
65  
66          // Determine reasonable default for nHeapArena and nDirectArena.
67          // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
68          final Runtime runtime = Runtime.getRuntime();
69          final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
70          DEFAULT_NUM_HEAP_ARENA = Math.max(0,
71                  SystemPropertyUtil.getInt(
72                          "io.netty.allocator.numHeapArenas",
73                          (int) Math.min(
74                                  runtime.availableProcessors(),
75                                  Runtime.getRuntime().maxMemory() / defaultChunkSize / 2 / 3)));
76          DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
77                  SystemPropertyUtil.getInt(
78                          "io.netty.allocator.numDirectArenas",
79                          (int) Math.min(
80                                  runtime.availableProcessors(),
81                                  PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
82  
83          // cache sizes
84          DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
85          DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
86          DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
87  
88          // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
89          // 'Scalable memory allocation using jemalloc'
90          DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
91                  "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
92  
93          // the number of threshold of allocations when cached entries will be freed up if not frequently used
94          DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
95                  "io.netty.allocator.cacheTrimInterval", 8192);
96  
97          if (logger.isDebugEnabled()) {
98              logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
99              logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
100             if (pageSizeFallbackCause == null) {
101                 logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
102             } else {
103                 logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
104             }
105             if (maxOrderFallbackCause == null) {
106                 logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
107             } else {
108                 logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
109             }
110             logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
111             logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
112             logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
113             logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
114             logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
115             logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
116         }
117     }
118 
119     public static final PooledByteBufAllocator DEFAULT =
120             new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
121 
122     private final PoolArena<byte[]>[] heapArenas;
123     private final PoolArena<ByteBuffer>[] directArenas;
124     private final int tinyCacheSize;
125     private final int smallCacheSize;
126     private final int normalCacheSize;
127 
128     final PoolThreadLocalCache threadCache;
129 
130     public PooledByteBufAllocator() {
131         this(false);
132     }
133 
134     public PooledByteBufAllocator(boolean preferDirect) {
135         this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
136     }
137 
138     public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
139         this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
140     }
141 
142     public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
143         this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
144                 DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
145     }
146 
147     public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
148                                   int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
149         super(preferDirect);
150         threadCache = new PoolThreadLocalCache();
151         this.tinyCacheSize = tinyCacheSize;
152         this.smallCacheSize = smallCacheSize;
153         this.normalCacheSize = normalCacheSize;
154         final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
155 
156         if (nHeapArena < 0) {
157             throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
158         }
159         if (nDirectArena < 0) {
160             throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
161         }
162 
163         int pageShifts = validateAndCalculatePageShifts(pageSize);
164 
165         if (nHeapArena > 0) {
166             heapArenas = newArenaArray(nHeapArena);
167             for (int i = 0; i < heapArenas.length; i ++) {
168                 heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
169             }
170         } else {
171             heapArenas = null;
172         }
173 
174         if (nDirectArena > 0) {
175             directArenas = newArenaArray(nDirectArena);
176             for (int i = 0; i < directArenas.length; i ++) {
177                 directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
178             }
179         } else {
180             directArenas = null;
181         }
182     }
183 
184     @SuppressWarnings("unchecked")
185     private static <T> PoolArena<T>[] newArenaArray(int size) {
186         return new PoolArena[size];
187     }
188 
189     private static int validateAndCalculatePageShifts(int pageSize) {
190         if (pageSize < MIN_PAGE_SIZE) {
191             throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + "+)");
192         }
193 
194         if ((pageSize & pageSize - 1) != 0) {
195             throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
196         }
197 
198         // Logarithm base 2. At this point we know that pageSize is a power of two.
199         return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
200     }
201 
202     private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
203         if (maxOrder > 14) {
204             throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
205         }
206 
207         // Ensure the resulting chunkSize does not overflow.
208         int chunkSize = pageSize;
209         for (int i = maxOrder; i > 0; i --) {
210             if (chunkSize > MAX_CHUNK_SIZE / 2) {
211                 throw new IllegalArgumentException(String.format(
212                         "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
213             }
214             chunkSize <<= 1;
215         }
216         return chunkSize;
217     }
218 
219     @Override
220     protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
221         PoolThreadCache cache = threadCache.get();
222         PoolArena<byte[]> heapArena = cache.heapArena;
223 
224         ByteBuf buf;
225         if (heapArena != null) {
226             buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
227         } else {
228             buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
229         }
230 
231         return toLeakAwareBuffer(buf);
232     }
233 
234     @Override
235     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
236         PoolThreadCache cache = threadCache.get();
237         PoolArena<ByteBuffer> directArena = cache.directArena;
238 
239         ByteBuf buf;
240         if (directArena != null) {
241             buf = directArena.allocate(cache, initialCapacity, maxCapacity);
242         } else {
243             if (PlatformDependent.hasUnsafe()) {
244                 buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
245             } else {
246                 buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
247             }
248         }
249 
250         return toLeakAwareBuffer(buf);
251     }
252 
253     @Override
254     public boolean isDirectBufferPooled() {
255         return directArenas != null;
256     }
257 
258     final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
259         private final AtomicInteger index = new AtomicInteger();
260 
261         @Override
262         protected PoolThreadCache initialValue() {
263             final int idx = index.getAndIncrement();
264             final PoolArena<byte[]> heapArena;
265             final PoolArena<ByteBuffer> directArena;
266 
267             if (heapArenas != null) {
268                 heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
269             } else {
270                 heapArena = null;
271             }
272 
273             if (directArenas != null) {
274                 directArena = directArenas[Math.abs(idx % directArenas.length)];
275             } else {
276                 directArena = null;
277             }
278 
279             return new PoolThreadCache(
280                     heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
281                     DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
282         }
283 
284         @Override
285         protected void onRemoval(PoolThreadCache value) {
286             value.free();
287         }
288     }
289 
290 //    Too noisy at the moment.
291 //
292 //    public String toString() {
293 //        StringBuilder buf = new StringBuilder();
294 //        buf.append(heapArenas.length);
295 //        buf.append(" heap arena(s):");
296 //        buf.append(StringUtil.NEWLINE);
297 //        for (PoolArena<byte[]> a: heapArenas) {
298 //            buf.append(a);
299 //        }
300 //        buf.append(directArenas.length);
301 //        buf.append(" direct arena(s):");
302 //        buf.append(StringUtil.NEWLINE);
303 //        for (PoolArena<ByteBuffer> a: directArenas) {
304 //            buf.append(a);
305 //        }
306 //        return buf.toString();
307 //    }
308 }