View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.socket.nio.NioSocketChannel;
22  import io.netty.util.Recycler;
23  import io.netty.util.Recycler.Handle;
24  import io.netty.util.ReferenceCountUtil;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.internal.InternalThreadLocalMap;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.logging.InternalLogger;
29  import io.netty.util.internal.logging.InternalLoggerFactory;
30  
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ClosedChannelException;
33  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
35  
36  /**
37   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
38   * outbound write requests.
39   * <p>
40   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
41   * <ul>
42   * <li>{@link #size()} and {@link #isEmpty()}</li>
43   * <li>{@link #isWritable()}</li>
44   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
45   * </ul>
46   * </p>
47   */
48  public final class ChannelOutboundBuffer {
49  
50      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
51  
52      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
53          @Override
54          protected ByteBuffer[] initialValue() throws Exception {
55              return new ByteBuffer[1024];
56          }
57      };
58  
59      private final Channel channel;
60  
61      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
62      //
63      // The Entry that is the first in the linked-list structure that was flushed
64      private Entry flushedEntry;
65      // The Entry which is the first unflushed in the linked-list structure
66      private Entry unflushedEntry;
67      // The Entry which represents the tail of the buffer
68      private Entry tailEntry;
69      // The number of flushed entries that are not written yet
70      private int flushed;
71  
72      private int nioBufferCount;
73      private long nioBufferSize;
74  
75      private boolean inFail;
76  
77      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER;
78  
79      @SuppressWarnings("UnusedDeclaration")
80      private volatile long totalPendingSize;
81  
82      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER;
83  
84      @SuppressWarnings("UnusedDeclaration")
85      private volatile int unwritable;
86  
87      private volatile Runnable fireChannelWritabilityChangedTask;
88  
89      static {
90          AtomicIntegerFieldUpdater<ChannelOutboundBuffer> unwritableUpdater =
91                  PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "unwritable");
92          if (unwritableUpdater == null) {
93              unwritableUpdater = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
94          }
95          UNWRITABLE_UPDATER = unwritableUpdater;
96  
97          AtomicLongFieldUpdater<ChannelOutboundBuffer> pendingSizeUpdater =
98                  PlatformDependent.newAtomicLongFieldUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
99          if (pendingSizeUpdater == null) {
100             pendingSizeUpdater = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
101         }
102         TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater;
103     }
104 
105     ChannelOutboundBuffer(AbstractChannel channel) {
106         this.channel = channel;
107     }
108 
109     /**
110      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
111      * the message was written.
112      */
113     public void addMessage(Object msg, int size, ChannelPromise promise) {
114         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
115         if (tailEntry == null) {
116             flushedEntry = null;
117             tailEntry = entry;
118         } else {
119             Entry tail = tailEntry;
120             tail.next = entry;
121             tailEntry = entry;
122         }
123         if (unflushedEntry == null) {
124             unflushedEntry = entry;
125         }
126 
127         // increment pending bytes after adding message to the unflushed arrays.
128         // See https://github.com/netty/netty/issues/1619
129         incrementPendingOutboundBytes(size, false);
130     }
131 
132     /**
133      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
134      * and so you will be able to handle them.
135      */
136     public void addFlush() {
137         // There is no need to process all entries if there was already a flush before and no new messages
138         // where added in the meantime.
139         //
140         // See https://github.com/netty/netty/issues/2577
141         Entry entry = unflushedEntry;
142         if (entry != null) {
143             if (flushedEntry == null) {
144                 // there is no flushedEntry yet, so start with the entry
145                 flushedEntry = entry;
146             }
147             do {
148                 flushed ++;
149                 if (!entry.promise.setUncancellable()) {
150                     // Was cancelled so make sure we free up memory and notify about the freed bytes
151                     int pending = entry.cancel();
152                     decrementPendingOutboundBytes(pending, false);
153                 }
154                 entry = entry.next;
155             } while (entry != null);
156 
157             // All flushed so reset unflushedEntry
158             unflushedEntry = null;
159         }
160     }
161 
162     /**
163      * Increment the pending bytes which will be written at some point.
164      * This method is thread-safe!
165      */
166     void incrementPendingOutboundBytes(long size) {
167         incrementPendingOutboundBytes(size, true);
168     }
169 
170     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
171         if (size == 0) {
172             return;
173         }
174 
175         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
176         if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
177             setUnwritable(invokeLater);
178         }
179     }
180 
181     /**
182      * Decrement the pending bytes which will be written at some point.
183      * This method is thread-safe!
184      */
185     void decrementPendingOutboundBytes(long size) {
186         decrementPendingOutboundBytes(size, true);
187     }
188 
189     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
190         if (size == 0) {
191             return;
192         }
193 
194         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
195         if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
196             setWritable(invokeLater);
197         }
198     }
199 
200     private static long total(Object msg) {
201         if (msg instanceof ByteBuf) {
202             return ((ByteBuf) msg).readableBytes();
203         }
204         if (msg instanceof FileRegion) {
205             return ((FileRegion) msg).count();
206         }
207         if (msg instanceof ByteBufHolder) {
208             return ((ByteBufHolder) msg).content().readableBytes();
209         }
210         return -1;
211     }
212 
213     /**
214      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
215      */
216     public Object current() {
217         Entry entry = flushedEntry;
218         if (entry == null) {
219             return null;
220         }
221 
222         return entry.msg;
223     }
224 
225     /**
226      * Notify the {@link ChannelPromise} of the current message about writing progress.
227      */
228     public void progress(long amount) {
229         Entry e = flushedEntry;
230         assert e != null;
231         ChannelPromise p = e.promise;
232         if (p instanceof ChannelProgressivePromise) {
233             long progress = e.progress + amount;
234             e.progress = progress;
235             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
236         }
237     }
238 
239     /**
240      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
241      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
242      * messages are ready to be handled.
243      */
244     public boolean remove() {
245         Entry e = flushedEntry;
246         if (e == null) {
247             return false;
248         }
249         Object msg = e.msg;
250 
251         ChannelPromise promise = e.promise;
252         int size = e.pendingSize;
253 
254         removeEntry(e);
255 
256         if (!e.cancelled) {
257             // only release message, notify and decrement if it was not canceled before.
258             ReferenceCountUtil.safeRelease(msg);
259             safeSuccess(promise);
260             decrementPendingOutboundBytes(size, false);
261         }
262 
263         // recycle the entry
264         e.recycle();
265 
266         return true;
267     }
268 
269     /**
270      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
271      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
272      * {@code false} to signal that no more messages are ready to be handled.
273      */
274     public boolean remove(Throwable cause) {
275         Entry e = flushedEntry;
276         if (e == null) {
277             return false;
278         }
279         Object msg = e.msg;
280 
281         ChannelPromise promise = e.promise;
282         int size = e.pendingSize;
283 
284         removeEntry(e);
285 
286         if (!e.cancelled) {
287             // only release message, fail and decrement if it was not canceled before.
288             ReferenceCountUtil.safeRelease(msg);
289 
290             safeFail(promise, cause);
291             decrementPendingOutboundBytes(size, false);
292         }
293 
294         // recycle the entry
295         e.recycle();
296 
297         return true;
298     }
299 
300     private void removeEntry(Entry e) {
301         if (-- flushed == 0) {
302             // processed everything
303             flushedEntry = null;
304             if (e == tailEntry) {
305                 tailEntry = null;
306                 unflushedEntry = null;
307             }
308         } else {
309             flushedEntry = e.next;
310         }
311     }
312 
313     /**
314      * Removes the fully written entries and update the reader index of the partially written entry.
315      * This operation assumes all messages in this buffer is {@link ByteBuf}.
316      */
317     public void removeBytes(long writtenBytes) {
318         for (;;) {
319             Object msg = current();
320             if (!(msg instanceof ByteBuf)) {
321                 assert writtenBytes == 0;
322                 break;
323             }
324 
325             final ByteBuf buf = (ByteBuf) msg;
326             final int readerIndex = buf.readerIndex();
327             final int readableBytes = buf.writerIndex() - readerIndex;
328 
329             if (readableBytes <= writtenBytes) {
330                 if (writtenBytes != 0) {
331                     progress(readableBytes);
332                     writtenBytes -= readableBytes;
333                 }
334                 remove();
335             } else { // readableBytes > writtenBytes
336                 if (writtenBytes != 0) {
337                     buf.readerIndex(readerIndex + (int) writtenBytes);
338                     progress(writtenBytes);
339                 }
340                 break;
341             }
342         }
343     }
344 
345     /**
346      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
347      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
348      * array and the total number of readable bytes of the NIO buffers respectively.
349      * <p>
350      * Note that the returned array is reused and thus should not escape
351      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
352      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
353      * </p>
354      */
355     public ByteBuffer[] nioBuffers() {
356         long nioBufferSize = 0;
357         int nioBufferCount = 0;
358         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
359         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
360         Entry entry = flushedEntry;
361         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
362             if (!entry.cancelled) {
363                 ByteBuf buf = (ByteBuf) entry.msg;
364                 final int readerIndex = buf.readerIndex();
365                 final int readableBytes = buf.writerIndex() - readerIndex;
366 
367                 if (readableBytes > 0) {
368                     nioBufferSize += readableBytes;
369                     int count = entry.count;
370                     if (count == -1) {
371                         //noinspection ConstantValueVariableUse
372                         entry.count = count =  buf.nioBufferCount();
373                     }
374                     int neededSpace = nioBufferCount + count;
375                     if (neededSpace > nioBuffers.length) {
376                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
377                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
378                     }
379                     if (count == 1) {
380                         ByteBuffer nioBuf = entry.buf;
381                         if (nioBuf == null) {
382                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
383                             // derived buffer
384                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
385                         }
386                         nioBuffers[nioBufferCount ++] = nioBuf;
387                     } else {
388                         ByteBuffer[] nioBufs = entry.bufs;
389                         if (nioBufs == null) {
390                             // cached ByteBuffers as they may be expensive to create in terms
391                             // of Object allocation
392                             entry.bufs = nioBufs = buf.nioBuffers();
393                         }
394                         nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
395                     }
396                 }
397             }
398             entry = entry.next;
399         }
400         this.nioBufferCount = nioBufferCount;
401         this.nioBufferSize = nioBufferSize;
402 
403         return nioBuffers;
404     }
405 
406     private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
407         for (ByteBuffer nioBuf: nioBufs) {
408             if (nioBuf == null) {
409                 break;
410             }
411             nioBuffers[nioBufferCount ++] = nioBuf;
412         }
413         return nioBufferCount;
414     }
415 
416     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
417         int newCapacity = array.length;
418         do {
419             // double capacity until it is big enough
420             // See https://github.com/netty/netty/issues/1890
421             newCapacity <<= 1;
422 
423             if (newCapacity < 0) {
424                 throw new IllegalStateException();
425             }
426 
427         } while (neededSpace > newCapacity);
428 
429         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
430         System.arraycopy(array, 0, newArray, 0, size);
431 
432         return newArray;
433     }
434 
435     /**
436      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
437      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
438      * was called.
439      */
440     public int nioBufferCount() {
441         return nioBufferCount;
442     }
443 
444     /**
445      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
446      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
447      * was called.
448      */
449     public long nioBufferSize() {
450         return nioBufferSize;
451     }
452 
453     /**
454      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
455      * not exceed the write watermark of the {@link Channel} and
456      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
457      * {@code false}.
458      */
459     public boolean isWritable() {
460         return unwritable == 0;
461     }
462 
463     /**
464      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
465      * {@code true}.
466      */
467     public boolean getUserDefinedWritability(int index) {
468         return (unwritable & writabilityMask(index)) == 0;
469     }
470 
471     /**
472      * Sets a user-defined writability flag at the specified index.
473      */
474     public void setUserDefinedWritability(int index, boolean writable) {
475         if (writable) {
476             setUserDefinedWritability(index);
477         } else {
478             clearUserDefinedWritability(index);
479         }
480     }
481 
482     private void setUserDefinedWritability(int index) {
483         final int mask = ~writabilityMask(index);
484         for (;;) {
485             final int oldValue = unwritable;
486             final int newValue = oldValue & mask;
487             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
488                 if (oldValue != 0 && newValue == 0) {
489                     fireChannelWritabilityChanged(true);
490                 }
491                 break;
492             }
493         }
494     }
495 
496     private void clearUserDefinedWritability(int index) {
497         final int mask = writabilityMask(index);
498         for (;;) {
499             final int oldValue = unwritable;
500             final int newValue = oldValue | mask;
501             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
502                 if (oldValue == 0 && newValue != 0) {
503                     fireChannelWritabilityChanged(true);
504                 }
505                 break;
506             }
507         }
508     }
509 
510     private static int writabilityMask(int index) {
511         if (index < 1 || index > 31) {
512             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
513         }
514         return 1 << index;
515     }
516 
517     private void setWritable(boolean invokeLater) {
518         for (;;) {
519             final int oldValue = unwritable;
520             final int newValue = oldValue & ~1;
521             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
522                 if (oldValue != 0 && newValue == 0) {
523                     fireChannelWritabilityChanged(invokeLater);
524                 }
525                 break;
526             }
527         }
528     }
529 
530     private void setUnwritable(boolean invokeLater) {
531         for (;;) {
532             final int oldValue = unwritable;
533             final int newValue = oldValue | 1;
534             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
535                 if (oldValue == 0 && newValue != 0) {
536                     fireChannelWritabilityChanged(invokeLater);
537                 }
538                 break;
539             }
540         }
541     }
542 
543     private void fireChannelWritabilityChanged(boolean invokeLater) {
544         final ChannelPipeline pipeline = channel.pipeline();
545         if (invokeLater) {
546             Runnable task = fireChannelWritabilityChangedTask;
547             if (task == null) {
548                 fireChannelWritabilityChangedTask = task = new Runnable() {
549                     @Override
550                     public void run() {
551                         pipeline.fireChannelWritabilityChanged();
552                     }
553                 };
554             }
555             channel.eventLoop().execute(task);
556         } else {
557             pipeline.fireChannelWritabilityChanged();
558         }
559     }
560 
561     /**
562      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
563      */
564     public int size() {
565         return flushed;
566     }
567 
568     /**
569      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
570      * otherwise.
571      */
572     public boolean isEmpty() {
573         return flushed == 0;
574     }
575 
576     void failFlushed(Throwable cause) {
577         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
578         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
579         // indirectly (usually by closing the channel.)
580         //
581         // See https://github.com/netty/netty/issues/1501
582         if (inFail) {
583             return;
584         }
585 
586         try {
587             inFail = true;
588             for (;;) {
589                 if (!remove(cause)) {
590                     break;
591                 }
592             }
593         } finally {
594             inFail = false;
595         }
596     }
597 
598     void close(final ClosedChannelException cause) {
599         if (inFail) {
600             channel.eventLoop().execute(new Runnable() {
601                 @Override
602                 public void run() {
603                     close(cause);
604                 }
605             });
606             return;
607         }
608 
609         inFail = true;
610 
611         if (channel.isOpen()) {
612             throw new IllegalStateException("close() must be invoked after the channel is closed.");
613         }
614 
615         if (!isEmpty()) {
616             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
617         }
618 
619         // Release all unflushed messages.
620         try {
621             Entry e = unflushedEntry;
622             while (e != null) {
623                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
624                 int size = e.pendingSize;
625                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
626 
627                 if (!e.cancelled) {
628                     ReferenceCountUtil.safeRelease(e.msg);
629                     safeFail(e.promise, cause);
630                 }
631                 e = e.recycleAndGetNext();
632             }
633         } finally {
634             inFail = false;
635         }
636     }
637 
638     private static void safeSuccess(ChannelPromise promise) {
639         if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
640             logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
641         }
642     }
643 
644     private static void safeFail(ChannelPromise promise, Throwable cause) {
645         if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
646             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
647         }
648     }
649 
650     @Deprecated
651     public void recycle() {
652         // NOOP
653     }
654 
655     public long totalPendingWriteBytes() {
656         return totalPendingSize;
657     }
658 
659     /**
660      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
661      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
662      * returns {@code false} or there are no more flushed messages to process.
663      */
664     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
665         if (processor == null) {
666             throw new NullPointerException("processor");
667         }
668 
669         Entry entry = flushedEntry;
670         if (entry == null) {
671             return;
672         }
673 
674         do {
675             if (!entry.cancelled) {
676                 if (!processor.processMessage(entry.msg)) {
677                     return;
678                 }
679             }
680             entry = entry.next;
681         } while (isFlushedEntry(entry));
682     }
683 
684     private boolean isFlushedEntry(Entry e) {
685         return e != null && e != unflushedEntry;
686     }
687 
688     public interface MessageProcessor {
689         /**
690          * Will be called for each flushed message until it either there are no more flushed messages or this
691          * method returns {@code false}.
692          */
693         boolean processMessage(Object msg) throws Exception;
694     }
695 
696     static final class Entry {
697         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
698             @Override
699             protected Entry newObject(Handle handle) {
700                 return new Entry(handle);
701             }
702         };
703 
704         private final Handle handle;
705         Entry next;
706         Object msg;
707         ByteBuffer[] bufs;
708         ByteBuffer buf;
709         ChannelPromise promise;
710         long progress;
711         long total;
712         int pendingSize;
713         int count = -1;
714         boolean cancelled;
715 
716         private Entry(Handle handle) {
717             this.handle = handle;
718         }
719 
720         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
721             Entry entry = RECYCLER.get();
722             entry.msg = msg;
723             entry.pendingSize = size;
724             entry.total = total;
725             entry.promise = promise;
726             return entry;
727         }
728 
729         int cancel() {
730             if (!cancelled) {
731                 cancelled = true;
732                 int pSize = pendingSize;
733 
734                 // release message and replace with an empty buffer
735                 ReferenceCountUtil.safeRelease(msg);
736                 msg = Unpooled.EMPTY_BUFFER;
737 
738                 pendingSize = 0;
739                 total = 0;
740                 progress = 0;
741                 bufs = null;
742                 buf = null;
743                 return pSize;
744             }
745             return 0;
746         }
747 
748         void recycle() {
749             next = null;
750             bufs = null;
751             buf = null;
752             msg = null;
753             promise = null;
754             progress = 0;
755             total = 0;
756             pendingSize = 0;
757             count = -1;
758             cancelled = false;
759             RECYCLER.recycle(this, handle);
760         }
761 
762         Entry recycleAndGetNext() {
763             Entry next = this.next;
764             recycle();
765             return next;
766         }
767     }
768 }