View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  import io.netty.util.internal.LongCounter;
20  import io.netty.util.internal.PlatformDependent;
21  import io.netty.util.internal.StringUtil;
22  
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import static io.netty.buffer.PoolChunk.isSubpage;
30  import static java.lang.Math.max;
31  
32  abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
33      static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
34  
35      enum SizeClass {
36          Small,
37          Normal
38      }
39  
40      final PooledByteBufAllocator parent;
41  
42      final int numSmallSubpagePools;
43      final int directMemoryCacheAlignment;
44      private final PoolSubpage<T>[] smallSubpagePools;
45  
46      private final PoolChunkList<T> q050;
47      private final PoolChunkList<T> q025;
48      private final PoolChunkList<T> q000;
49      private final PoolChunkList<T> qInit;
50      private final PoolChunkList<T> q075;
51      private final PoolChunkList<T> q100;
52  
53      private final List<PoolChunkListMetric> chunkListMetrics;
54  
55      // Metrics for allocations and deallocations
56      private long allocationsNormal;
57      // We need to use the LongCounter here as this is not guarded via synchronized block.
58      private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
59      private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
60      private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
61  
62      private long deallocationsSmall;
63      private long deallocationsNormal;
64  
65      // We need to use the LongCounter here as this is not guarded via synchronized block.
66      private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
67  
68      // Number of thread caches backed by this arena.
69      final AtomicInteger numThreadCaches = new AtomicInteger();
70  
71      // TODO: Test if adding padding helps under contention
72      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
73  
74      protected PoolArena(PooledByteBufAllocator parent, int pageSize,
75            int pageShifts, int chunkSize, int cacheAlignment) {
76          super(pageSize, pageShifts, chunkSize, cacheAlignment);
77          this.parent = parent;
78          directMemoryCacheAlignment = cacheAlignment;
79  
80          numSmallSubpagePools = nSubpages;
81          smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
82          for (int i = 0; i < smallSubpagePools.length; i ++) {
83              smallSubpagePools[i] = newSubpagePoolHead();
84          }
85  
86          q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
87          q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
88          q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
89          q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
90          q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
91          qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
92  
93          q100.prevList(q075);
94          q075.prevList(q050);
95          q050.prevList(q025);
96          q025.prevList(q000);
97          q000.prevList(null);
98          qInit.prevList(qInit);
99  
100         List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
101         metrics.add(qInit);
102         metrics.add(q000);
103         metrics.add(q025);
104         metrics.add(q050);
105         metrics.add(q075);
106         metrics.add(q100);
107         chunkListMetrics = Collections.unmodifiableList(metrics);
108     }
109 
110     private PoolSubpage<T> newSubpagePoolHead() {
111         PoolSubpage<T> head = new PoolSubpage<T>();
112         head.prev = head;
113         head.next = head;
114         return head;
115     }
116 
117     @SuppressWarnings("unchecked")
118     private PoolSubpage<T>[] newSubpagePoolArray(int size) {
119         return new PoolSubpage[size];
120     }
121 
122     abstract boolean isDirect();
123 
124     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
125         PooledByteBuf<T> buf = newByteBuf(maxCapacity);
126         allocate(cache, buf, reqCapacity);
127         return buf;
128     }
129 
130     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
131         final int sizeIdx = size2SizeIdx(reqCapacity);
132 
133         if (sizeIdx <= smallMaxSizeIdx) {
134             tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
135         } else if (sizeIdx < nSizes) {
136             tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
137         } else {
138             int normCapacity = directMemoryCacheAlignment > 0
139                     ? normalizeSize(reqCapacity) : reqCapacity;
140             // Huge allocations are never served via the cache so just call allocateHuge
141             allocateHuge(buf, normCapacity);
142         }
143     }
144 
145     private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
146                                      final int sizeIdx) {
147 
148         if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
149             // was able to allocate out of the cache so move on
150             return;
151         }
152 
153         /*
154          * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
155          * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
156          */
157         final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
158         final boolean needsNormalAllocation;
159         synchronized (head) {
160             final PoolSubpage<T> s = head.next;
161             needsNormalAllocation = s == head;
162             if (!needsNormalAllocation) {
163                 assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx) : "doNotDestroy=" +
164                         s.doNotDestroy + ", elemSize=" + s.elemSize + ", sizeIdx=" + sizeIdx;
165                 long handle = s.allocate();
166                 assert handle >= 0;
167                 s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
168             }
169         }
170 
171         if (needsNormalAllocation) {
172             synchronized (this) {
173                 allocateNormal(buf, reqCapacity, sizeIdx, cache);
174             }
175         }
176 
177         incSmallAllocation();
178     }
179 
180     private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
181                                       final int sizeIdx) {
182         if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) {
183             // was able to allocate out of the cache so move on
184             return;
185         }
186         synchronized (this) {
187             allocateNormal(buf, reqCapacity, sizeIdx, cache);
188             ++allocationsNormal;
189         }
190     }
191 
192     // Method must be called inside synchronized(this) { ... } block
193     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
194         if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
195             q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
196             q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
197             qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
198             q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
199             return;
200         }
201 
202         // Add a new chunk.
203         PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
204         boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
205         assert success;
206         qInit.add(c);
207     }
208 
209     private void incSmallAllocation() {
210         allocationsSmall.increment();
211     }
212 
213     private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
214         PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
215         activeBytesHuge.add(chunk.chunkSize());
216         buf.initUnpooled(chunk, reqCapacity);
217         allocationsHuge.increment();
218     }
219 
220     void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
221         if (chunk.unpooled) {
222             int size = chunk.chunkSize();
223             destroyChunk(chunk);
224             activeBytesHuge.add(-size);
225             deallocationsHuge.increment();
226         } else {
227             SizeClass sizeClass = sizeClass(handle);
228             if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
229                 // cached so not free it.
230                 return;
231             }
232 
233             freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
234         }
235     }
236 
237     private static SizeClass sizeClass(long handle) {
238         return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
239     }
240 
241     void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
242                    boolean finalizer) {
243         final boolean destroyChunk;
244         synchronized (this) {
245             // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
246             // may fail due lazy class-loading in for example tomcat.
247             if (!finalizer) {
248                 switch (sizeClass) {
249                     case Normal:
250                         ++deallocationsNormal;
251                         break;
252                     case Small:
253                         ++deallocationsSmall;
254                         break;
255                     default:
256                         throw new Error();
257                 }
258             }
259             destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
260         }
261         if (destroyChunk) {
262             // destroyChunk not need to be called while holding the synchronized lock.
263             destroyChunk(chunk);
264         }
265     }
266 
267     PoolSubpage<T> findSubpagePoolHead(int sizeIdx) {
268         return smallSubpagePools[sizeIdx];
269     }
270 
271     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
272         assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();
273 
274         int oldCapacity = buf.length;
275         if (oldCapacity == newCapacity) {
276             return;
277         }
278 
279         PoolChunk<T> oldChunk = buf.chunk;
280         ByteBuffer oldNioBuffer = buf.tmpNioBuf;
281         long oldHandle = buf.handle;
282         T oldMemory = buf.memory;
283         int oldOffset = buf.offset;
284         int oldMaxLength = buf.maxLength;
285 
286         // This does not touch buf's reader/writer indices
287         allocate(parent.threadCache(), buf, newCapacity);
288         int bytesToCopy;
289         if (newCapacity > oldCapacity) {
290             bytesToCopy = oldCapacity;
291         } else {
292             buf.trimIndicesToCapacity(newCapacity);
293             bytesToCopy = newCapacity;
294         }
295         memoryCopy(oldMemory, oldOffset, buf, bytesToCopy);
296         if (freeOldMemory) {
297             free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
298         }
299     }
300 
301     @Override
302     public int numThreadCaches() {
303         return numThreadCaches.get();
304     }
305 
306     @Override
307     public int numTinySubpages() {
308         return 0;
309     }
310 
311     @Override
312     public int numSmallSubpages() {
313         return smallSubpagePools.length;
314     }
315 
316     @Override
317     public int numChunkLists() {
318         return chunkListMetrics.size();
319     }
320 
321     @Override
322     public List<PoolSubpageMetric> tinySubpages() {
323         return Collections.emptyList();
324     }
325 
326     @Override
327     public List<PoolSubpageMetric> smallSubpages() {
328         return subPageMetricList(smallSubpagePools);
329     }
330 
331     @Override
332     public List<PoolChunkListMetric> chunkLists() {
333         return chunkListMetrics;
334     }
335 
336     private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage<?>[] pages) {
337         List<PoolSubpageMetric> metrics = new ArrayList<PoolSubpageMetric>();
338         for (PoolSubpage<?> head : pages) {
339             if (head.next == head) {
340                 continue;
341             }
342             PoolSubpage<?> s = head.next;
343             for (;;) {
344                 metrics.add(s);
345                 s = s.next;
346                 if (s == head) {
347                     break;
348                 }
349             }
350         }
351         return metrics;
352     }
353 
354     @Override
355     public long numAllocations() {
356         final long allocsNormal;
357         synchronized (this) {
358             allocsNormal = allocationsNormal;
359         }
360         return allocationsSmall.value() + allocsNormal + allocationsHuge.value();
361     }
362 
363     @Override
364     public long numTinyAllocations() {
365         return 0;
366     }
367 
368     @Override
369     public long numSmallAllocations() {
370         return allocationsSmall.value();
371     }
372 
373     @Override
374     public synchronized long numNormalAllocations() {
375         return allocationsNormal;
376     }
377 
378     @Override
379     public long numDeallocations() {
380         final long deallocs;
381         synchronized (this) {
382             deallocs = deallocationsSmall + deallocationsNormal;
383         }
384         return deallocs + deallocationsHuge.value();
385     }
386 
387     @Override
388     public long numTinyDeallocations() {
389         return 0;
390     }
391 
392     @Override
393     public synchronized long numSmallDeallocations() {
394         return deallocationsSmall;
395     }
396 
397     @Override
398     public synchronized long numNormalDeallocations() {
399         return deallocationsNormal;
400     }
401 
402     @Override
403     public long numHugeAllocations() {
404         return allocationsHuge.value();
405     }
406 
407     @Override
408     public long numHugeDeallocations() {
409         return deallocationsHuge.value();
410     }
411 
412     @Override
413     public  long numActiveAllocations() {
414         long val = allocationsSmall.value() + allocationsHuge.value()
415                 - deallocationsHuge.value();
416         synchronized (this) {
417             val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
418         }
419         return max(val, 0);
420     }
421 
422     @Override
423     public long numActiveTinyAllocations() {
424         return 0;
425     }
426 
427     @Override
428     public long numActiveSmallAllocations() {
429         return max(numSmallAllocations() - numSmallDeallocations(), 0);
430     }
431 
432     @Override
433     public long numActiveNormalAllocations() {
434         final long val;
435         synchronized (this) {
436             val = allocationsNormal - deallocationsNormal;
437         }
438         return max(val, 0);
439     }
440 
441     @Override
442     public long numActiveHugeAllocations() {
443         return max(numHugeAllocations() - numHugeDeallocations(), 0);
444     }
445 
446     @Override
447     public long numActiveBytes() {
448         long val = activeBytesHuge.value();
449         synchronized (this) {
450             for (int i = 0; i < chunkListMetrics.size(); i++) {
451                 for (PoolChunkMetric m: chunkListMetrics.get(i)) {
452                     val += m.chunkSize();
453                 }
454             }
455         }
456         return max(0, val);
457     }
458 
459     /**
460      * Return the number of bytes that are currently pinned to buffer instances, by the arena. The pinned memory is not
461      * accessible for use by any other allocation, until the buffers using have all been released.
462      */
463     public long numPinnedBytes() {
464         long val = activeBytesHuge.value(); // Huge chunks are exact-sized for the buffers they were allocated to.
465         synchronized (this) {
466             for (int i = 0; i < chunkListMetrics.size(); i++) {
467                 for (PoolChunkMetric m: chunkListMetrics.get(i)) {
468                     val += ((PoolChunk<?>) m).pinnedBytes();
469                 }
470             }
471         }
472         return max(0, val);
473     }
474 
475     protected abstract PoolChunk<T> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
476     protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
477     protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
478     protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf<T> dst, int length);
479     protected abstract void destroyChunk(PoolChunk<T> chunk);
480 
481     @Override
482     public synchronized String toString() {
483         StringBuilder buf = new StringBuilder()
484             .append("Chunk(s) at 0~25%:")
485             .append(StringUtil.NEWLINE)
486             .append(qInit)
487             .append(StringUtil.NEWLINE)
488             .append("Chunk(s) at 0~50%:")
489             .append(StringUtil.NEWLINE)
490             .append(q000)
491             .append(StringUtil.NEWLINE)
492             .append("Chunk(s) at 25~75%:")
493             .append(StringUtil.NEWLINE)
494             .append(q025)
495             .append(StringUtil.NEWLINE)
496             .append("Chunk(s) at 50~100%:")
497             .append(StringUtil.NEWLINE)
498             .append(q050)
499             .append(StringUtil.NEWLINE)
500             .append("Chunk(s) at 75~100%:")
501             .append(StringUtil.NEWLINE)
502             .append(q075)
503             .append(StringUtil.NEWLINE)
504             .append("Chunk(s) at 100%:")
505             .append(StringUtil.NEWLINE)
506             .append(q100)
507             .append(StringUtil.NEWLINE)
508             .append("small subpages:");
509         appendPoolSubPages(buf, smallSubpagePools);
510         buf.append(StringUtil.NEWLINE);
511 
512         return buf.toString();
513     }
514 
515     private static void appendPoolSubPages(StringBuilder buf, PoolSubpage<?>[] subpages) {
516         for (int i = 0; i < subpages.length; i ++) {
517             PoolSubpage<?> head = subpages[i];
518             if (head.next == head) {
519                 continue;
520             }
521 
522             buf.append(StringUtil.NEWLINE)
523                     .append(i)
524                     .append(": ");
525             PoolSubpage<?> s = head.next;
526             for (;;) {
527                 buf.append(s);
528                 s = s.next;
529                 if (s == head) {
530                     break;
531                 }
532             }
533         }
534     }
535 
536     @Override
537     protected final void finalize() throws Throwable {
538         try {
539             super.finalize();
540         } finally {
541             destroyPoolSubPages(smallSubpagePools);
542             destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
543         }
544     }
545 
546     private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
547         for (PoolSubpage<?> page : pages) {
548             page.destroy();
549         }
550     }
551 
552     private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
553         for (PoolChunkList<T> chunkList: chunkLists) {
554             chunkList.destroy(this);
555         }
556     }
557 
558     static final class HeapArena extends PoolArena<byte[]> {
559 
560         HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
561                   int chunkSize) {
562             super(parent, pageSize, pageShifts, chunkSize,
563                   0);
564         }
565 
566         private static byte[] newByteArray(int size) {
567             return PlatformDependent.allocateUninitializedArray(size);
568         }
569 
570         @Override
571         boolean isDirect() {
572             return false;
573         }
574 
575         @Override
576         protected PoolChunk<byte[]> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
577             return new PoolChunk<byte[]>(
578                     this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);
579         }
580 
581         @Override
582         protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
583             return new PoolChunk<byte[]>(this, null, newByteArray(capacity), capacity);
584         }
585 
586         @Override
587         protected void destroyChunk(PoolChunk<byte[]> chunk) {
588             // Rely on GC.
589         }
590 
591         @Override
592         protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
593             return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
594                     : PooledHeapByteBuf.newInstance(maxCapacity);
595         }
596 
597         @Override
598         protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf<byte[]> dst, int length) {
599             if (length == 0) {
600                 return;
601             }
602 
603             System.arraycopy(src, srcOffset, dst.memory, dst.offset, length);
604         }
605     }
606 
607     static final class DirectArena extends PoolArena<ByteBuffer> {
608 
609         DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
610                     int chunkSize, int directMemoryCacheAlignment) {
611             super(parent, pageSize, pageShifts, chunkSize,
612                   directMemoryCacheAlignment);
613         }
614 
615         @Override
616         boolean isDirect() {
617             return true;
618         }
619 
620         @Override
621         protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
622             int pageShifts, int chunkSize) {
623             if (directMemoryCacheAlignment == 0) {
624                 ByteBuffer memory = allocateDirect(chunkSize);
625                 return new PoolChunk<ByteBuffer>(this, memory, memory, pageSize, pageShifts,
626                         chunkSize, maxPageIdx);
627             }
628 
629             final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
630             final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
631             return new PoolChunk<ByteBuffer>(this, base, memory, pageSize,
632                     pageShifts, chunkSize, maxPageIdx);
633         }
634 
635         @Override
636         protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
637             if (directMemoryCacheAlignment == 0) {
638                 ByteBuffer memory = allocateDirect(capacity);
639                 return new PoolChunk<ByteBuffer>(this, memory, memory, capacity);
640             }
641 
642             final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
643             final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
644             return new PoolChunk<ByteBuffer>(this, base, memory, capacity);
645         }
646 
647         private static ByteBuffer allocateDirect(int capacity) {
648             return PlatformDependent.useDirectBufferNoCleaner() ?
649                     PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
650         }
651 
652         @Override
653         protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
654             if (PlatformDependent.useDirectBufferNoCleaner()) {
655                 PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base);
656             } else {
657                 PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base);
658             }
659         }
660 
661         @Override
662         protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
663             if (HAS_UNSAFE) {
664                 return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
665             } else {
666                 return PooledDirectByteBuf.newInstance(maxCapacity);
667             }
668         }
669 
670         @Override
671         protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {
672             if (length == 0) {
673                 return;
674             }
675 
676             if (HAS_UNSAFE) {
677                 PlatformDependent.copyMemory(
678                         PlatformDependent.directBufferAddress(src) + srcOffset,
679                         PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);
680             } else {
681                 // We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
682                 src = src.duplicate();
683                 ByteBuffer dst = dstBuf.internalNioBuffer();
684                 src.position(srcOffset).limit(srcOffset + length);
685                 dst.position(dstBuf.offset);
686                 dst.put(src);
687             }
688         }
689     }
690 }