View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.socket.nio.NioSocketChannel;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.InternalThreadLocalMap;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectPool.Handle;
27  import io.netty.util.internal.ObjectPool.ObjectCreator;
28  import io.netty.util.internal.ObjectUtil;
29  import io.netty.util.internal.PromiseNotificationUtil;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.nio.ByteBuffer;
35  import java.nio.channels.ClosedChannelException;
36  import java.util.Arrays;
37  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
39  
40  import static java.lang.Math.min;
41  
42  /**
43   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
44   * outbound write requests.
45   * <p>
46   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
47   * <ul>
48   * <li>{@link #size()} and {@link #isEmpty()}</li>
49   * <li>{@link #isWritable()}</li>
50   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
51   * </ul>
52   * </p>
53   */
54  public final class ChannelOutboundBuffer {
55      // Assuming a 64-bit JVM:
56      //  - 16 bytes object header
57      //  - 6 reference fields
58      //  - 2 long fields
59      //  - 2 int fields
60      //  - 1 boolean field
61      //  - padding
62      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
63              SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
64  
65      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
66  
67      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
68          @Override
69          protected ByteBuffer[] initialValue() throws Exception {
70              return new ByteBuffer[1024];
71          }
72      };
73  
74      private final Channel channel;
75  
76      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
77      //
78      // The Entry that is the first in the linked-list structure that was flushed
79      private Entry flushedEntry;
80      // The Entry which is the first unflushed in the linked-list structure
81      private Entry unflushedEntry;
82      // The Entry which represents the tail of the buffer
83      private Entry tailEntry;
84      // The number of flushed entries that are not written yet
85      private int flushed;
86  
87      private int nioBufferCount;
88      private long nioBufferSize;
89  
90      private boolean inFail;
91  
92      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
93              AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
94  
95      @SuppressWarnings("UnusedDeclaration")
96      private volatile long totalPendingSize;
97  
98      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
99              AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
100 
101     @SuppressWarnings("UnusedDeclaration")
102     private volatile int unwritable;
103 
104     private volatile Runnable fireChannelWritabilityChangedTask;
105 
106     ChannelOutboundBuffer(AbstractChannel channel) {
107         this.channel = channel;
108     }
109 
110     /**
111      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
112      * the message was written.
113      */
114     public void addMessage(Object msg, int size, ChannelPromise promise) {
115         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
116         if (tailEntry == null) {
117             flushedEntry = null;
118         } else {
119             Entry tail = tailEntry;
120             tail.next = entry;
121         }
122         tailEntry = entry;
123         if (unflushedEntry == null) {
124             unflushedEntry = entry;
125         }
126 
127         // increment pending bytes after adding message to the unflushed arrays.
128         // See https://github.com/netty/netty/issues/1619
129         incrementPendingOutboundBytes(entry.pendingSize, false);
130     }
131 
132     /**
133      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
134      * and so you will be able to handle them.
135      */
136     public void addFlush() {
137         // There is no need to process all entries if there was already a flush before and no new messages
138         // where added in the meantime.
139         //
140         // See https://github.com/netty/netty/issues/2577
141         Entry entry = unflushedEntry;
142         if (entry != null) {
143             if (flushedEntry == null) {
144                 // there is no flushedEntry yet, so start with the entry
145                 flushedEntry = entry;
146             }
147             do {
148                 flushed ++;
149                 if (!entry.promise.setUncancellable()) {
150                     // Was cancelled so make sure we free up memory and notify about the freed bytes
151                     int pending = entry.cancel();
152                     decrementPendingOutboundBytes(pending, false, true);
153                 }
154                 entry = entry.next;
155             } while (entry != null);
156 
157             // All flushed so reset unflushedEntry
158             unflushedEntry = null;
159         }
160     }
161 
162     /**
163      * Increment the pending bytes which will be written at some point.
164      * This method is thread-safe!
165      */
166     void incrementPendingOutboundBytes(long size) {
167         incrementPendingOutboundBytes(size, true);
168     }
169 
170     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
171         if (size == 0) {
172             return;
173         }
174 
175         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
176         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
177             setUnwritable(invokeLater);
178         }
179     }
180 
181     /**
182      * Decrement the pending bytes which will be written at some point.
183      * This method is thread-safe!
184      */
185     void decrementPendingOutboundBytes(long size) {
186         decrementPendingOutboundBytes(size, true, true);
187     }
188 
189     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
190         if (size == 0) {
191             return;
192         }
193 
194         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
195         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
196             setWritable(invokeLater);
197         }
198     }
199 
200     private static long total(Object msg) {
201         if (msg instanceof ByteBuf) {
202             return ((ByteBuf) msg).readableBytes();
203         }
204         if (msg instanceof FileRegion) {
205             return ((FileRegion) msg).count();
206         }
207         if (msg instanceof ByteBufHolder) {
208             return ((ByteBufHolder) msg).content().readableBytes();
209         }
210         return -1;
211     }
212 
213     /**
214      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
215      */
216     public Object current() {
217         Entry entry = flushedEntry;
218         if (entry == null) {
219             return null;
220         }
221 
222         return entry.msg;
223     }
224 
225     /**
226      * Return the current message flush progress.
227      * @return {@code 0} if nothing was flushed before for the current message or there is no current message
228      */
229     public long currentProgress() {
230         Entry entry = flushedEntry;
231         if (entry == null) {
232             return 0;
233         }
234         return entry.progress;
235     }
236 
237     /**
238      * Notify the {@link ChannelPromise} of the current message about writing progress.
239      */
240     public void progress(long amount) {
241         Entry e = flushedEntry;
242         assert e != null;
243         ChannelPromise p = e.promise;
244         long progress = e.progress + amount;
245         e.progress = progress;
246         if (p instanceof ChannelProgressivePromise) {
247             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
248         }
249     }
250 
251     /**
252      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
253      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
254      * messages are ready to be handled.
255      */
256     public boolean remove() {
257         Entry e = flushedEntry;
258         if (e == null) {
259             clearNioBuffers();
260             return false;
261         }
262         Object msg = e.msg;
263 
264         ChannelPromise promise = e.promise;
265         int size = e.pendingSize;
266 
267         removeEntry(e);
268 
269         if (!e.cancelled) {
270             // only release message, notify and decrement if it was not canceled before.
271             ReferenceCountUtil.safeRelease(msg);
272             safeSuccess(promise);
273             decrementPendingOutboundBytes(size, false, true);
274         }
275 
276         // recycle the entry
277         e.recycle();
278 
279         return true;
280     }
281 
282     /**
283      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
284      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
285      * {@code false} to signal that no more messages are ready to be handled.
286      */
287     public boolean remove(Throwable cause) {
288         return remove0(cause, true);
289     }
290 
291     private boolean remove0(Throwable cause, boolean notifyWritability) {
292         Entry e = flushedEntry;
293         if (e == null) {
294             clearNioBuffers();
295             return false;
296         }
297         Object msg = e.msg;
298 
299         ChannelPromise promise = e.promise;
300         int size = e.pendingSize;
301 
302         removeEntry(e);
303 
304         if (!e.cancelled) {
305             // only release message, fail and decrement if it was not canceled before.
306             ReferenceCountUtil.safeRelease(msg);
307 
308             safeFail(promise, cause);
309             decrementPendingOutboundBytes(size, false, notifyWritability);
310         }
311 
312         // recycle the entry
313         e.recycle();
314 
315         return true;
316     }
317 
318     private void removeEntry(Entry e) {
319         if (-- flushed == 0) {
320             // processed everything
321             flushedEntry = null;
322             if (e == tailEntry) {
323                 tailEntry = null;
324                 unflushedEntry = null;
325             }
326         } else {
327             flushedEntry = e.next;
328         }
329     }
330 
331     /**
332      * Removes the fully written entries and update the reader index of the partially written entry.
333      * This operation assumes all messages in this buffer is {@link ByteBuf}.
334      */
335     public void removeBytes(long writtenBytes) {
336         for (;;) {
337             Object msg = current();
338             if (!(msg instanceof ByteBuf)) {
339                 assert writtenBytes == 0;
340                 break;
341             }
342 
343             final ByteBuf buf = (ByteBuf) msg;
344             final int readerIndex = buf.readerIndex();
345             final int readableBytes = buf.writerIndex() - readerIndex;
346 
347             if (readableBytes <= writtenBytes) {
348                 if (writtenBytes != 0) {
349                     progress(readableBytes);
350                     writtenBytes -= readableBytes;
351                 }
352                 remove();
353             } else { // readableBytes > writtenBytes
354                 if (writtenBytes != 0) {
355                     buf.readerIndex(readerIndex + (int) writtenBytes);
356                     progress(writtenBytes);
357                 }
358                 break;
359             }
360         }
361         clearNioBuffers();
362     }
363 
364     // Clear all ByteBuffer from the array so these can be GC'ed.
365     // See https://github.com/netty/netty/issues/3837
366     private void clearNioBuffers() {
367         int count = nioBufferCount;
368         if (count > 0) {
369             nioBufferCount = 0;
370             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
371         }
372     }
373 
374     /**
375      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
376      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
377      * array and the total number of readable bytes of the NIO buffers respectively.
378      * <p>
379      * Note that the returned array is reused and thus should not escape
380      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
381      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
382      * </p>
383      */
384     public ByteBuffer[] nioBuffers() {
385         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
386     }
387 
388     /**
389      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
390      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
391      * array and the total number of readable bytes of the NIO buffers respectively.
392      * <p>
393      * Note that the returned array is reused and thus should not escape
394      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
395      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
396      * </p>
397      * @param maxCount The maximum amount of buffers that will be added to the return value.
398      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
399      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
400      *                 in the return value to ensure write progress is made.
401      */
402     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
403         assert maxCount > 0;
404         assert maxBytes > 0;
405         long nioBufferSize = 0;
406         int nioBufferCount = 0;
407         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
408         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
409         Entry entry = flushedEntry;
410         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
411             if (!entry.cancelled) {
412                 ByteBuf buf = (ByteBuf) entry.msg;
413                 final int readerIndex = buf.readerIndex();
414                 final int readableBytes = buf.writerIndex() - readerIndex;
415 
416                 if (readableBytes > 0) {
417                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
418                         // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
419                         // we stop populate the ByteBuffer array. This is done for 2 reasons:
420                         // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
421                         // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
422                         // on the architecture and kernel but to be safe we also enforce the limit here.
423                         // 2. There is no sense in putting more data in the array than is likely to be accepted by the
424                         // OS.
425                         //
426                         // See also:
427                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
428                         // - https://linux.die.net//man/2/writev
429                         break;
430                     }
431                     nioBufferSize += readableBytes;
432                     int count = entry.count;
433                     if (count == -1) {
434                         //noinspection ConstantValueVariableUse
435                         entry.count = count = buf.nioBufferCount();
436                     }
437                     int neededSpace = min(maxCount, nioBufferCount + count);
438                     if (neededSpace > nioBuffers.length) {
439                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
440                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
441                     }
442                     if (count == 1) {
443                         ByteBuffer nioBuf = entry.buf;
444                         if (nioBuf == null) {
445                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
446                             // derived buffer
447                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
448                         }
449                         nioBuffers[nioBufferCount++] = nioBuf;
450                     } else {
451                         // The code exists in an extra method to ensure the method is not too big to inline as this
452                         // branch is not very likely to get hit very frequently.
453                         nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
454                     }
455                     if (nioBufferCount >= maxCount) {
456                         break;
457                     }
458                 }
459             }
460             entry = entry.next;
461         }
462         this.nioBufferCount = nioBufferCount;
463         this.nioBufferSize = nioBufferSize;
464 
465         return nioBuffers;
466     }
467 
468     private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
469         ByteBuffer[] nioBufs = entry.bufs;
470         if (nioBufs == null) {
471             // cached ByteBuffers as they may be expensive to create in terms
472             // of Object allocation
473             entry.bufs = nioBufs = buf.nioBuffers();
474         }
475         for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
476             ByteBuffer nioBuf = nioBufs[i];
477             if (nioBuf == null) {
478                 break;
479             } else if (!nioBuf.hasRemaining()) {
480                 continue;
481             }
482             nioBuffers[nioBufferCount++] = nioBuf;
483         }
484         return nioBufferCount;
485     }
486 
487     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
488         int newCapacity = array.length;
489         do {
490             // double capacity until it is big enough
491             // See https://github.com/netty/netty/issues/1890
492             newCapacity <<= 1;
493 
494             if (newCapacity < 0) {
495                 throw new IllegalStateException();
496             }
497 
498         } while (neededSpace > newCapacity);
499 
500         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
501         System.arraycopy(array, 0, newArray, 0, size);
502 
503         return newArray;
504     }
505 
506     /**
507      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
508      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
509      * was called.
510      */
511     public int nioBufferCount() {
512         return nioBufferCount;
513     }
514 
515     /**
516      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
517      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
518      * was called.
519      */
520     public long nioBufferSize() {
521         return nioBufferSize;
522     }
523 
524     /**
525      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
526      * not exceed the write watermark of the {@link Channel} and
527      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
528      * {@code false}.
529      */
530     public boolean isWritable() {
531         return unwritable == 0;
532     }
533 
534     /**
535      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
536      * {@code true}.
537      */
538     public boolean getUserDefinedWritability(int index) {
539         return (unwritable & writabilityMask(index)) == 0;
540     }
541 
542     /**
543      * Sets a user-defined writability flag at the specified index.
544      */
545     public void setUserDefinedWritability(int index, boolean writable) {
546         if (writable) {
547             setUserDefinedWritability(index);
548         } else {
549             clearUserDefinedWritability(index);
550         }
551     }
552 
553     private void setUserDefinedWritability(int index) {
554         final int mask = ~writabilityMask(index);
555         for (;;) {
556             final int oldValue = unwritable;
557             final int newValue = oldValue & mask;
558             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
559                 if (oldValue != 0 && newValue == 0) {
560                     fireChannelWritabilityChanged(true);
561                 }
562                 break;
563             }
564         }
565     }
566 
567     private void clearUserDefinedWritability(int index) {
568         final int mask = writabilityMask(index);
569         for (;;) {
570             final int oldValue = unwritable;
571             final int newValue = oldValue | mask;
572             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
573                 if (oldValue == 0 && newValue != 0) {
574                     fireChannelWritabilityChanged(true);
575                 }
576                 break;
577             }
578         }
579     }
580 
581     private static int writabilityMask(int index) {
582         if (index < 1 || index > 31) {
583             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
584         }
585         return 1 << index;
586     }
587 
588     private void setWritable(boolean invokeLater) {
589         for (;;) {
590             final int oldValue = unwritable;
591             final int newValue = oldValue & ~1;
592             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
593                 if (oldValue != 0 && newValue == 0) {
594                     fireChannelWritabilityChanged(invokeLater);
595                 }
596                 break;
597             }
598         }
599     }
600 
601     private void setUnwritable(boolean invokeLater) {
602         for (;;) {
603             final int oldValue = unwritable;
604             final int newValue = oldValue | 1;
605             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
606                 if (oldValue == 0) {
607                     fireChannelWritabilityChanged(invokeLater);
608                 }
609                 break;
610             }
611         }
612     }
613 
614     private void fireChannelWritabilityChanged(boolean invokeLater) {
615         final ChannelPipeline pipeline = channel.pipeline();
616         if (invokeLater) {
617             Runnable task = fireChannelWritabilityChangedTask;
618             if (task == null) {
619                 fireChannelWritabilityChangedTask = task = new Runnable() {
620                     @Override
621                     public void run() {
622                         pipeline.fireChannelWritabilityChanged();
623                     }
624                 };
625             }
626             channel.eventLoop().execute(task);
627         } else {
628             pipeline.fireChannelWritabilityChanged();
629         }
630     }
631 
632     /**
633      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
634      */
635     public int size() {
636         return flushed;
637     }
638 
639     /**
640      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
641      * otherwise.
642      */
643     public boolean isEmpty() {
644         return flushed == 0;
645     }
646 
647     void failFlushed(Throwable cause, boolean notify) {
648         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
649         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
650         // indirectly (usually by closing the channel.)
651         //
652         // See https://github.com/netty/netty/issues/1501
653         if (inFail) {
654             return;
655         }
656 
657         try {
658             inFail = true;
659             for (;;) {
660                 if (!remove0(cause, notify)) {
661                     break;
662                 }
663             }
664         } finally {
665             inFail = false;
666         }
667     }
668 
669     void close(final Throwable cause, final boolean allowChannelOpen) {
670         if (inFail) {
671             channel.eventLoop().execute(new Runnable() {
672                 @Override
673                 public void run() {
674                     close(cause, allowChannelOpen);
675                 }
676             });
677             return;
678         }
679 
680         inFail = true;
681 
682         if (!allowChannelOpen && channel.isOpen()) {
683             throw new IllegalStateException("close() must be invoked after the channel is closed.");
684         }
685 
686         if (!isEmpty()) {
687             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
688         }
689 
690         // Release all unflushed messages.
691         try {
692             Entry e = unflushedEntry;
693             while (e != null) {
694                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
695                 int size = e.pendingSize;
696                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
697 
698                 if (!e.cancelled) {
699                     ReferenceCountUtil.safeRelease(e.msg);
700                     safeFail(e.promise, cause);
701                 }
702                 e = e.recycleAndGetNext();
703             }
704         } finally {
705             inFail = false;
706         }
707         clearNioBuffers();
708     }
709 
710     void close(ClosedChannelException cause) {
711         close(cause, false);
712     }
713 
714     private static void safeSuccess(ChannelPromise promise) {
715         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
716         // false.
717         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
718     }
719 
720     private static void safeFail(ChannelPromise promise, Throwable cause) {
721         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
722         // false.
723         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
724     }
725 
726     @Deprecated
727     public void recycle() {
728         // NOOP
729     }
730 
731     public long totalPendingWriteBytes() {
732         return totalPendingSize;
733     }
734 
735     /**
736      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
737      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
738      */
739     public long bytesBeforeUnwritable() {
740         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
741         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
742         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
743         // together. totalPendingSize will be updated before isWritable().
744         if (bytes > 0) {
745             return isWritable() ? bytes : 0;
746         }
747         return 0;
748     }
749 
750     /**
751      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
752      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
753      */
754     public long bytesBeforeWritable() {
755         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
756         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
757         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
758         // together. totalPendingSize will be updated before isWritable().
759         if (bytes > 0) {
760             return isWritable() ? 0 : bytes;
761         }
762         return 0;
763     }
764 
765     /**
766      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
767      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
768      * returns {@code false} or there are no more flushed messages to process.
769      */
770     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
771         ObjectUtil.checkNotNull(processor, "processor");
772 
773         Entry entry = flushedEntry;
774         if (entry == null) {
775             return;
776         }
777 
778         do {
779             if (!entry.cancelled) {
780                 if (!processor.processMessage(entry.msg)) {
781                     return;
782                 }
783             }
784             entry = entry.next;
785         } while (isFlushedEntry(entry));
786     }
787 
788     private boolean isFlushedEntry(Entry e) {
789         return e != null && e != unflushedEntry;
790     }
791 
792     public interface MessageProcessor {
793         /**
794          * Will be called for each flushed message until it either there are no more flushed messages or this
795          * method returns {@code false}.
796          */
797         boolean processMessage(Object msg) throws Exception;
798     }
799 
800     static final class Entry {
801         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
802             @Override
803             public Entry newObject(Handle<Entry> handle) {
804                 return new Entry(handle);
805             }
806         });
807 
808         private final Handle<Entry> handle;
809         Entry next;
810         Object msg;
811         ByteBuffer[] bufs;
812         ByteBuffer buf;
813         ChannelPromise promise;
814         long progress;
815         long total;
816         int pendingSize;
817         int count = -1;
818         boolean cancelled;
819 
820         private Entry(Handle<Entry> handle) {
821             this.handle = handle;
822         }
823 
824         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
825             Entry entry = RECYCLER.get();
826             entry.msg = msg;
827             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
828             entry.total = total;
829             entry.promise = promise;
830             return entry;
831         }
832 
833         int cancel() {
834             if (!cancelled) {
835                 cancelled = true;
836                 int pSize = pendingSize;
837 
838                 // release message and replace with an empty buffer
839                 ReferenceCountUtil.safeRelease(msg);
840                 msg = Unpooled.EMPTY_BUFFER;
841 
842                 pendingSize = 0;
843                 total = 0;
844                 progress = 0;
845                 bufs = null;
846                 buf = null;
847                 return pSize;
848             }
849             return 0;
850         }
851 
852         void recycle() {
853             next = null;
854             bufs = null;
855             buf = null;
856             msg = null;
857             promise = null;
858             progress = 0;
859             total = 0;
860             pendingSize = 0;
861             count = -1;
862             cancelled = false;
863             handle.recycle(this);
864         }
865 
866         Entry recycleAndGetNext() {
867             Entry next = this.next;
868             recycle();
869             return next;
870         }
871     }
872 }