View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  
22  import java.nio.ByteBuffer;
23  
24  abstract class PoolArena<T> {
25  
26      static final int numTinySubpagePools = 512 >>> 4;
27  
28      final PooledByteBufAllocator parent;
29  
30      private final int maxOrder;
31      final int pageSize;
32      final int pageShifts;
33      final int chunkSize;
34      final int subpageOverflowMask;
35      final int numSmallSubpagePools;
36      private final PoolSubpage<T>[] tinySubpagePools;
37      private final PoolSubpage<T>[] smallSubpagePools;
38  
39      private final PoolChunkList<T> q050;
40      private final PoolChunkList<T> q025;
41      private final PoolChunkList<T> q000;
42      private final PoolChunkList<T> qInit;
43      private final PoolChunkList<T> q075;
44      private final PoolChunkList<T> q100;
45  
46      // TODO: Test if adding padding helps under contention
47      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
48  
49      protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
50          this.parent = parent;
51          this.pageSize = pageSize;
52          this.maxOrder = maxOrder;
53          this.pageShifts = pageShifts;
54          this.chunkSize = chunkSize;
55          subpageOverflowMask = ~(pageSize - 1);
56          tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
57          for (int i = 0; i < tinySubpagePools.length; i ++) {
58              tinySubpagePools[i] = newSubpagePoolHead(pageSize);
59          }
60  
61          numSmallSubpagePools = pageShifts - 9;
62          smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
63          for (int i = 0; i < smallSubpagePools.length; i ++) {
64              smallSubpagePools[i] = newSubpagePoolHead(pageSize);
65          }
66  
67          q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE);
68          q075 = new PoolChunkList<T>(this, q100, 75, 100);
69          q050 = new PoolChunkList<T>(this, q075, 50, 100);
70          q025 = new PoolChunkList<T>(this, q050, 25, 75);
71          q000 = new PoolChunkList<T>(this, q025, 1, 50);
72          qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25);
73  
74          q100.prevList = q075;
75          q075.prevList = q050;
76          q050.prevList = q025;
77          q025.prevList = q000;
78          q000.prevList = null;
79          qInit.prevList = qInit;
80      }
81  
82      private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
83          PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
84          head.prev = head;
85          head.next = head;
86          return head;
87      }
88  
89      @SuppressWarnings("unchecked")
90      private PoolSubpage<T>[] newSubpagePoolArray(int size) {
91          return new PoolSubpage[size];
92      }
93  
94      abstract boolean isDirect();
95  
96      PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
97          PooledByteBuf<T> buf = newByteBuf(maxCapacity);
98          allocate(cache, buf, reqCapacity);
99          return buf;
100     }
101 
102     static int tinyIdx(int normCapacity) {
103         return normCapacity >>> 4;
104     }
105 
106     static int smallIdx(int normCapacity) {
107         int tableIdx = 0;
108         int i = normCapacity >>> 10;
109         while (i != 0) {
110             i >>>= 1;
111             tableIdx ++;
112         }
113         return tableIdx;
114     }
115 
116     // capacity < pageSize
117     boolean isTinyOrSmall(int normCapacity) {
118         return (normCapacity & subpageOverflowMask) == 0;
119     }
120 
121     // normCapacity < 512
122     static boolean isTiny(int normCapacity) {
123         return (normCapacity & 0xFFFFFE00) == 0;
124     }
125 
126     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
127         final int normCapacity = normalizeCapacity(reqCapacity);
128         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
129             int tableIdx;
130             PoolSubpage<T>[] table;
131             if (isTiny(normCapacity)) { // < 512
132                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
133                     // was able to allocate out of the cache so move on
134                     return;
135                 }
136                 tableIdx = tinyIdx(normCapacity);
137                 table = tinySubpagePools;
138             } else {
139                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
140                     // was able to allocate out of the cache so move on
141                     return;
142                 }
143                 tableIdx = smallIdx(normCapacity);
144                 table = smallSubpagePools;
145             }
146 
147             synchronized (this) {
148                 final PoolSubpage<T> head = table[tableIdx];
149                 final PoolSubpage<T> s = head.next;
150                 if (s != head) {
151                     assert s.doNotDestroy && s.elemSize == normCapacity;
152                     long handle = s.allocate();
153                     assert handle >= 0;
154                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
155                     return;
156                 }
157             }
158         } else if (normCapacity <= chunkSize) {
159             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
160                 // was able to allocate out of the cache so move on
161                 return;
162             }
163         } else {
164             // Huge allocations are never served via the cache so just call allocateHuge
165             allocateHuge(buf, reqCapacity);
166             return;
167         }
168         allocateNormal(buf, reqCapacity, normCapacity);
169     }
170 
171     private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
172         if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
173             q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
174             q075.allocate(buf, reqCapacity, normCapacity) || q100.allocate(buf, reqCapacity, normCapacity)) {
175             return;
176         }
177 
178         // Add a new chunk.
179         PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
180         long handle = c.allocate(normCapacity);
181         assert handle > 0;
182         c.initBuf(buf, handle, reqCapacity);
183         qInit.add(c);
184     }
185 
186     private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
187         buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
188     }
189 
190     void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) {
191         if (chunk.unpooled) {
192             destroyChunk(chunk);
193         } else {
194             if (sameThreads) {
195                 PoolThreadCache cache = parent.threadCache.get();
196                 if (cache.add(this, chunk, handle, normCapacity)) {
197                     // cached so not free it.
198                     return;
199                 }
200             }
201 
202             synchronized (this) {
203                 chunk.parent.free(chunk, handle);
204             }
205         }
206     }
207 
208     PoolSubpage<T> findSubpagePoolHead(int elemSize) {
209         int tableIdx;
210         PoolSubpage<T>[] table;
211         if (isTiny(elemSize)) { // < 512
212             tableIdx = elemSize >>> 4;
213             table = tinySubpagePools;
214         } else {
215             tableIdx = 0;
216             elemSize >>>= 10;
217             while (elemSize != 0) {
218                 elemSize >>>= 1;
219                 tableIdx ++;
220             }
221             table = smallSubpagePools;
222         }
223 
224         return table[tableIdx];
225     }
226 
227     int normalizeCapacity(int reqCapacity) {
228         if (reqCapacity < 0) {
229             throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
230         }
231         if (reqCapacity >= chunkSize) {
232             return reqCapacity;
233         }
234 
235         if (!isTiny(reqCapacity)) { // >= 512
236             // Doubled
237 
238             int normalizedCapacity = reqCapacity;
239             normalizedCapacity --;
240             normalizedCapacity |= normalizedCapacity >>>  1;
241             normalizedCapacity |= normalizedCapacity >>>  2;
242             normalizedCapacity |= normalizedCapacity >>>  4;
243             normalizedCapacity |= normalizedCapacity >>>  8;
244             normalizedCapacity |= normalizedCapacity >>> 16;
245             normalizedCapacity ++;
246 
247             if (normalizedCapacity < 0) {
248                 normalizedCapacity >>>= 1;
249             }
250 
251             return normalizedCapacity;
252         }
253 
254         // Quantum-spaced
255         if ((reqCapacity & 15) == 0) {
256             return reqCapacity;
257         }
258 
259         return (reqCapacity & ~15) + 16;
260     }
261 
262     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
263         if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
264             throw new IllegalArgumentException("newCapacity: " + newCapacity);
265         }
266 
267         int oldCapacity = buf.length;
268         if (oldCapacity == newCapacity) {
269             return;
270         }
271 
272         PoolChunk<T> oldChunk = buf.chunk;
273         long oldHandle = buf.handle;
274         T oldMemory = buf.memory;
275         int oldOffset = buf.offset;
276         int oldMaxLength = buf.maxLength;
277         int readerIndex = buf.readerIndex();
278         int writerIndex = buf.writerIndex();
279 
280         allocate(parent.threadCache.get(), buf, newCapacity);
281         if (newCapacity > oldCapacity) {
282             memoryCopy(
283                     oldMemory, oldOffset,
284                     buf.memory, buf.offset, oldCapacity);
285         } else if (newCapacity < oldCapacity) {
286             if (readerIndex < newCapacity) {
287                 if (writerIndex > newCapacity) {
288                     writerIndex = newCapacity;
289                 }
290                 memoryCopy(
291                         oldMemory, oldOffset + readerIndex,
292                         buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
293             } else {
294                 readerIndex = writerIndex = newCapacity;
295             }
296         }
297 
298         buf.setIndex(readerIndex, writerIndex);
299 
300         if (freeOldMemory) {
301             free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread());
302         }
303     }
304 
305     protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
306     protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
307     protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
308     protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
309     protected abstract void destroyChunk(PoolChunk<T> chunk);
310 
311     public synchronized String toString() {
312         StringBuilder buf = new StringBuilder()
313             .append("Chunk(s) at 0~25%:")
314             .append(StringUtil.NEWLINE)
315             .append(qInit)
316             .append(StringUtil.NEWLINE)
317             .append("Chunk(s) at 0~50%:")
318             .append(StringUtil.NEWLINE)
319             .append(q000)
320             .append(StringUtil.NEWLINE)
321             .append("Chunk(s) at 25~75%:")
322             .append(StringUtil.NEWLINE)
323             .append(q025)
324             .append(StringUtil.NEWLINE)
325             .append("Chunk(s) at 50~100%:")
326             .append(StringUtil.NEWLINE)
327             .append(q050)
328             .append(StringUtil.NEWLINE)
329             .append("Chunk(s) at 75~100%:")
330             .append(StringUtil.NEWLINE)
331             .append(q075)
332             .append(StringUtil.NEWLINE)
333             .append("Chunk(s) at 100%:")
334             .append(StringUtil.NEWLINE)
335             .append(q100)
336             .append(StringUtil.NEWLINE)
337             .append("tiny subpages:");
338         for (int i = 1; i < tinySubpagePools.length; i ++) {
339             PoolSubpage<T> head = tinySubpagePools[i];
340             if (head.next == head) {
341                 continue;
342             }
343 
344             buf.append(StringUtil.NEWLINE)
345                .append(i)
346                .append(": ");
347             PoolSubpage<T> s = head.next;
348             for (;;) {
349                 buf.append(s);
350                 s = s.next;
351                 if (s == head) {
352                     break;
353                 }
354             }
355         }
356         buf.append(StringUtil.NEWLINE)
357            .append("small subpages:");
358         for (int i = 1; i < smallSubpagePools.length; i ++) {
359             PoolSubpage<T> head = smallSubpagePools[i];
360             if (head.next == head) {
361                 continue;
362             }
363 
364             buf.append(StringUtil.NEWLINE)
365                .append(i)
366                .append(": ");
367             PoolSubpage<T> s = head.next;
368             for (;;) {
369                 buf.append(s);
370                 s = s.next;
371                 if (s == head) {
372                     break;
373                 }
374             }
375         }
376         buf.append(StringUtil.NEWLINE);
377 
378         return buf.toString();
379     }
380 
381     static final class HeapArena extends PoolArena<byte[]> {
382 
383         HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
384             super(parent, pageSize, maxOrder, pageShifts, chunkSize);
385         }
386 
387         @Override
388         boolean isDirect() {
389             return false;
390         }
391 
392         @Override
393         protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
394             return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
395         }
396 
397         @Override
398         protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
399             return new PoolChunk<byte[]>(this, new byte[capacity], capacity);
400         }
401 
402         @Override
403         protected void destroyChunk(PoolChunk<byte[]> chunk) {
404             // Rely on GC.
405         }
406 
407         @Override
408         protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
409             return PooledHeapByteBuf.newInstance(maxCapacity);
410         }
411 
412         @Override
413         protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
414             if (length == 0) {
415                 return;
416             }
417 
418             System.arraycopy(src, srcOffset, dst, dstOffset, length);
419         }
420     }
421 
422     static final class DirectArena extends PoolArena<ByteBuffer> {
423 
424         private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
425 
426         DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
427             super(parent, pageSize, maxOrder, pageShifts, chunkSize);
428         }
429 
430         @Override
431         boolean isDirect() {
432             return true;
433         }
434 
435         @Override
436         protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
437             return new PoolChunk<ByteBuffer>(
438                     this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);
439         }
440 
441         @Override
442         protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
443             return new PoolChunk<ByteBuffer>(this, ByteBuffer.allocateDirect(capacity), capacity);
444         }
445 
446         @Override
447         protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
448             PlatformDependent.freeDirectBuffer(chunk.memory);
449         }
450 
451         @Override
452         protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
453             if (HAS_UNSAFE) {
454                 return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
455             } else {
456                 return PooledDirectByteBuf.newInstance(maxCapacity);
457             }
458         }
459 
460         @Override
461         protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
462             if (length == 0) {
463                 return;
464             }
465 
466             if (HAS_UNSAFE) {
467                 PlatformDependent.copyMemory(
468                         PlatformDependent.directBufferAddress(src) + srcOffset,
469                         PlatformDependent.directBufferAddress(dst) + dstOffset, length);
470             } else {
471                 // We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
472                 src = src.duplicate();
473                 dst = dst.duplicate();
474                 src.position(srcOffset).limit(srcOffset + length);
475                 dst.position(dstOffset);
476                 dst.put(src);
477             }
478         }
479     }
480 }