View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.socket.nio.NioSocketChannel;
22  import io.netty.util.Recycler;
23  import io.netty.util.Recycler.Handle;
24  import io.netty.util.ReferenceCountUtil;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.internal.InternalThreadLocalMap;
27  import io.netty.util.internal.PromiseNotificationUtil;
28  import io.netty.util.internal.SystemPropertyUtil;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.util.Arrays;
35  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
36  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
37  
38  import static java.lang.Math.min;
39  
40  /**
41   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
42   * outbound write requests.
43   * <p>
44   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
45   * <ul>
46   * <li>{@link #size()} and {@link #isEmpty()}</li>
47   * <li>{@link #isWritable()}</li>
48   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
49   * </ul>
50   * </p>
51   */
52  public final class ChannelOutboundBuffer {
53      // Assuming a 64-bit JVM:
54      //  - 16 bytes object header
55      //  - 8 reference fields
56      //  - 2 long fields
57      //  - 2 int fields
58      //  - 1 boolean field
59      //  - padding
60      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
61              SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
62  
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
64  
65      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
66          @Override
67          protected ByteBuffer[] initialValue() throws Exception {
68              return new ByteBuffer[1024];
69          }
70      };
71  
72      private final Channel channel;
73  
74      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
75      //
76      // The Entry that is the first in the linked-list structure that was flushed
77      private Entry flushedEntry;
78      // The Entry which is the first unflushed in the linked-list structure
79      private Entry unflushedEntry;
80      // The Entry which represents the tail of the buffer
81      private Entry tailEntry;
82      // The number of flushed entries that are not written yet
83      private int flushed;
84  
85      private int nioBufferCount;
86      private long nioBufferSize;
87  
88      private boolean inFail;
89  
90      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
91              AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
92  
93      @SuppressWarnings("UnusedDeclaration")
94      private volatile long totalPendingSize;
95  
96      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
97              AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
98  
99      @SuppressWarnings("UnusedDeclaration")
100     private volatile int unwritable;
101 
102     private volatile Runnable fireChannelWritabilityChangedTask;
103 
104     ChannelOutboundBuffer(AbstractChannel channel) {
105         this.channel = channel;
106     }
107 
108     /**
109      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
110      * the message was written.
111      */
112     public void addMessage(Object msg, int size, ChannelPromise promise) {
113         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
114         if (tailEntry == null) {
115             flushedEntry = null;
116         } else {
117             Entry tail = tailEntry;
118             tail.next = entry;
119         }
120         tailEntry = entry;
121         if (unflushedEntry == null) {
122             unflushedEntry = entry;
123         }
124 
125         // increment pending bytes after adding message to the unflushed arrays.
126         // See https://github.com/netty/netty/issues/1619
127         incrementPendingOutboundBytes(entry.pendingSize, false);
128     }
129 
130     /**
131      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
132      * and so you will be able to handle them.
133      */
134     public void addFlush() {
135         // There is no need to process all entries if there was already a flush before and no new messages
136         // where added in the meantime.
137         //
138         // See https://github.com/netty/netty/issues/2577
139         Entry entry = unflushedEntry;
140         if (entry != null) {
141             if (flushedEntry == null) {
142                 // there is no flushedEntry yet, so start with the entry
143                 flushedEntry = entry;
144             }
145             do {
146                 flushed ++;
147                 if (!entry.promise.setUncancellable()) {
148                     // Was cancelled so make sure we free up memory and notify about the freed bytes
149                     int pending = entry.cancel();
150                     decrementPendingOutboundBytes(pending, false, true);
151                 }
152                 entry = entry.next;
153             } while (entry != null);
154 
155             // All flushed so reset unflushedEntry
156             unflushedEntry = null;
157         }
158     }
159 
160     /**
161      * Increment the pending bytes which will be written at some point.
162      * This method is thread-safe!
163      */
164     void incrementPendingOutboundBytes(long size) {
165         incrementPendingOutboundBytes(size, true);
166     }
167 
168     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
169         if (size == 0) {
170             return;
171         }
172 
173         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
174         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
175             setUnwritable(invokeLater);
176         }
177     }
178 
179     /**
180      * Decrement the pending bytes which will be written at some point.
181      * This method is thread-safe!
182      */
183     void decrementPendingOutboundBytes(long size) {
184         decrementPendingOutboundBytes(size, true, true);
185     }
186 
187     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
188         if (size == 0) {
189             return;
190         }
191 
192         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
193         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
194             setWritable(invokeLater);
195         }
196     }
197 
198     private static long total(Object msg) {
199         if (msg instanceof ByteBuf) {
200             return ((ByteBuf) msg).readableBytes();
201         }
202         if (msg instanceof FileRegion) {
203             return ((FileRegion) msg).count();
204         }
205         if (msg instanceof ByteBufHolder) {
206             return ((ByteBufHolder) msg).content().readableBytes();
207         }
208         return -1;
209     }
210 
211     /**
212      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
213      */
214     public Object current() {
215         Entry entry = flushedEntry;
216         if (entry == null) {
217             return null;
218         }
219 
220         return entry.msg;
221     }
222 
223     /**
224      * Return the current message flush progress.
225      * @return {@code 0} if nothing was flushed before for the current message or there is no current message
226      */
227     public long currentProgress() {
228         Entry entry = flushedEntry;
229         if (entry == null) {
230             return 0;
231         }
232         return entry.progress;
233     }
234 
235     /**
236      * Notify the {@link ChannelPromise} of the current message about writing progress.
237      */
238     public void progress(long amount) {
239         Entry e = flushedEntry;
240         assert e != null;
241         ChannelPromise p = e.promise;
242         long progress = e.progress + amount;
243         e.progress = progress;
244         if (p instanceof ChannelProgressivePromise) {
245             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
246         }
247     }
248 
249     /**
250      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
251      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
252      * messages are ready to be handled.
253      */
254     public boolean remove() {
255         Entry e = flushedEntry;
256         if (e == null) {
257             clearNioBuffers();
258             return false;
259         }
260         Object msg = e.msg;
261 
262         ChannelPromise promise = e.promise;
263         int size = e.pendingSize;
264 
265         removeEntry(e);
266 
267         if (!e.cancelled) {
268             // only release message, notify and decrement if it was not canceled before.
269             ReferenceCountUtil.safeRelease(msg);
270             safeSuccess(promise);
271             decrementPendingOutboundBytes(size, false, true);
272         }
273 
274         // recycle the entry
275         e.recycle();
276 
277         return true;
278     }
279 
280     /**
281      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
282      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
283      * {@code false} to signal that no more messages are ready to be handled.
284      */
285     public boolean remove(Throwable cause) {
286         return remove0(cause, true);
287     }
288 
289     private boolean remove0(Throwable cause, boolean notifyWritability) {
290         Entry e = flushedEntry;
291         if (e == null) {
292             clearNioBuffers();
293             return false;
294         }
295         Object msg = e.msg;
296 
297         ChannelPromise promise = e.promise;
298         int size = e.pendingSize;
299 
300         removeEntry(e);
301 
302         if (!e.cancelled) {
303             // only release message, fail and decrement if it was not canceled before.
304             ReferenceCountUtil.safeRelease(msg);
305 
306             safeFail(promise, cause);
307             decrementPendingOutboundBytes(size, false, notifyWritability);
308         }
309 
310         // recycle the entry
311         e.recycle();
312 
313         return true;
314     }
315 
316     private void removeEntry(Entry e) {
317         if (-- flushed == 0) {
318             // processed everything
319             flushedEntry = null;
320             if (e == tailEntry) {
321                 tailEntry = null;
322                 unflushedEntry = null;
323             }
324         } else {
325             flushedEntry = e.next;
326         }
327     }
328 
329     /**
330      * Removes the fully written entries and update the reader index of the partially written entry.
331      * This operation assumes all messages in this buffer is {@link ByteBuf}.
332      */
333     public void removeBytes(long writtenBytes) {
334         for (;;) {
335             Object msg = current();
336             if (!(msg instanceof ByteBuf)) {
337                 assert writtenBytes == 0;
338                 break;
339             }
340 
341             final ByteBuf buf = (ByteBuf) msg;
342             final int readerIndex = buf.readerIndex();
343             final int readableBytes = buf.writerIndex() - readerIndex;
344 
345             if (readableBytes <= writtenBytes) {
346                 if (writtenBytes != 0) {
347                     progress(readableBytes);
348                     writtenBytes -= readableBytes;
349                 }
350                 remove();
351             } else { // readableBytes > writtenBytes
352                 if (writtenBytes != 0) {
353                     buf.readerIndex(readerIndex + (int) writtenBytes);
354                     progress(writtenBytes);
355                 }
356                 break;
357             }
358         }
359         clearNioBuffers();
360     }
361 
362     // Clear all ByteBuffer from the array so these can be GC'ed.
363     // See https://github.com/netty/netty/issues/3837
364     private void clearNioBuffers() {
365         int count = nioBufferCount;
366         if (count > 0) {
367             nioBufferCount = 0;
368             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
369         }
370     }
371 
372     /**
373      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
374      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
375      * array and the total number of readable bytes of the NIO buffers respectively.
376      * <p>
377      * Note that the returned array is reused and thus should not escape
378      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
379      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
380      * </p>
381      */
382     public ByteBuffer[] nioBuffers() {
383         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
384     }
385 
386     /**
387      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
388      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
389      * array and the total number of readable bytes of the NIO buffers respectively.
390      * <p>
391      * Note that the returned array is reused and thus should not escape
392      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
393      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
394      * </p>
395      * @param maxCount The maximum amount of buffers that will be added to the return value.
396      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
397      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
398      *                 in the return value to ensure write progress is made.
399      */
400     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
401         assert maxCount > 0;
402         assert maxBytes > 0;
403         long nioBufferSize = 0;
404         int nioBufferCount = 0;
405         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
406         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
407         Entry entry = flushedEntry;
408         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
409             if (!entry.cancelled) {
410                 ByteBuf buf = (ByteBuf) entry.msg;
411                 final int readerIndex = buf.readerIndex();
412                 final int readableBytes = buf.writerIndex() - readerIndex;
413 
414                 if (readableBytes > 0) {
415                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
416                         // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
417                         // we stop populate the ByteBuffer array. This is done for 2 reasons:
418                         // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
419                         // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
420                         // on the architecture and kernel but to be safe we also enforce the limit here.
421                         // 2. There is no sense in putting more data in the array than is likely to be accepted by the
422                         // OS.
423                         //
424                         // See also:
425                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
426                         // - http://linux.die.net/man/2/writev
427                         break;
428                     }
429                     nioBufferSize += readableBytes;
430                     int count = entry.count;
431                     if (count == -1) {
432                         //noinspection ConstantValueVariableUse
433                         entry.count = count = buf.nioBufferCount();
434                     }
435                     int neededSpace = min(maxCount, nioBufferCount + count);
436                     if (neededSpace > nioBuffers.length) {
437                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
438                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
439                     }
440                     if (count == 1) {
441                         ByteBuffer nioBuf = entry.buf;
442                         if (nioBuf == null) {
443                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
444                             // derived buffer
445                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
446                         }
447                         nioBuffers[nioBufferCount++] = nioBuf;
448                     } else {
449                         // The code exists in an extra method to ensure the method is not too big to inline as this
450                         // branch is not very likely to get hit very frequently.
451                         nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
452                     }
453                     if (nioBufferCount == maxCount) {
454                         break;
455                     }
456                 }
457             }
458             entry = entry.next;
459         }
460         this.nioBufferCount = nioBufferCount;
461         this.nioBufferSize = nioBufferSize;
462 
463         return nioBuffers;
464     }
465 
466     private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
467         ByteBuffer[] nioBufs = entry.bufs;
468         if (nioBufs == null) {
469             // cached ByteBuffers as they may be expensive to create in terms
470             // of Object allocation
471             entry.bufs = nioBufs = buf.nioBuffers();
472         }
473         for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
474             ByteBuffer nioBuf = nioBufs[i];
475             if (nioBuf == null) {
476                 break;
477             } else if (!nioBuf.hasRemaining()) {
478                 continue;
479             }
480             nioBuffers[nioBufferCount++] = nioBuf;
481         }
482         return nioBufferCount;
483     }
484 
485     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
486         int newCapacity = array.length;
487         do {
488             // double capacity until it is big enough
489             // See https://github.com/netty/netty/issues/1890
490             newCapacity <<= 1;
491 
492             if (newCapacity < 0) {
493                 throw new IllegalStateException();
494             }
495 
496         } while (neededSpace > newCapacity);
497 
498         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
499         System.arraycopy(array, 0, newArray, 0, size);
500 
501         return newArray;
502     }
503 
504     /**
505      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
506      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
507      * was called.
508      */
509     public int nioBufferCount() {
510         return nioBufferCount;
511     }
512 
513     /**
514      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
515      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
516      * was called.
517      */
518     public long nioBufferSize() {
519         return nioBufferSize;
520     }
521 
522     /**
523      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
524      * not exceed the write watermark of the {@link Channel} and
525      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
526      * {@code false}.
527      */
528     public boolean isWritable() {
529         return unwritable == 0;
530     }
531 
532     /**
533      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
534      * {@code true}.
535      */
536     public boolean getUserDefinedWritability(int index) {
537         return (unwritable & writabilityMask(index)) == 0;
538     }
539 
540     /**
541      * Sets a user-defined writability flag at the specified index.
542      */
543     public void setUserDefinedWritability(int index, boolean writable) {
544         if (writable) {
545             setUserDefinedWritability(index);
546         } else {
547             clearUserDefinedWritability(index);
548         }
549     }
550 
551     private void setUserDefinedWritability(int index) {
552         final int mask = ~writabilityMask(index);
553         for (;;) {
554             final int oldValue = unwritable;
555             final int newValue = oldValue & mask;
556             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
557                 if (oldValue != 0 && newValue == 0) {
558                     fireChannelWritabilityChanged(true);
559                 }
560                 break;
561             }
562         }
563     }
564 
565     private void clearUserDefinedWritability(int index) {
566         final int mask = writabilityMask(index);
567         for (;;) {
568             final int oldValue = unwritable;
569             final int newValue = oldValue | mask;
570             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
571                 if (oldValue == 0 && newValue != 0) {
572                     fireChannelWritabilityChanged(true);
573                 }
574                 break;
575             }
576         }
577     }
578 
579     private static int writabilityMask(int index) {
580         if (index < 1 || index > 31) {
581             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
582         }
583         return 1 << index;
584     }
585 
586     private void setWritable(boolean invokeLater) {
587         for (;;) {
588             final int oldValue = unwritable;
589             final int newValue = oldValue & ~1;
590             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
591                 if (oldValue != 0 && newValue == 0) {
592                     fireChannelWritabilityChanged(invokeLater);
593                 }
594                 break;
595             }
596         }
597     }
598 
599     private void setUnwritable(boolean invokeLater) {
600         for (;;) {
601             final int oldValue = unwritable;
602             final int newValue = oldValue | 1;
603             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
604                 if (oldValue == 0 && newValue != 0) {
605                     fireChannelWritabilityChanged(invokeLater);
606                 }
607                 break;
608             }
609         }
610     }
611 
612     private void fireChannelWritabilityChanged(boolean invokeLater) {
613         final ChannelPipeline pipeline = channel.pipeline();
614         if (invokeLater) {
615             Runnable task = fireChannelWritabilityChangedTask;
616             if (task == null) {
617                 fireChannelWritabilityChangedTask = task = new Runnable() {
618                     @Override
619                     public void run() {
620                         pipeline.fireChannelWritabilityChanged();
621                     }
622                 };
623             }
624             channel.eventLoop().execute(task);
625         } else {
626             pipeline.fireChannelWritabilityChanged();
627         }
628     }
629 
630     /**
631      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
632      */
633     public int size() {
634         return flushed;
635     }
636 
637     /**
638      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
639      * otherwise.
640      */
641     public boolean isEmpty() {
642         return flushed == 0;
643     }
644 
645     void failFlushed(Throwable cause, boolean notify) {
646         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
647         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
648         // indirectly (usually by closing the channel.)
649         //
650         // See https://github.com/netty/netty/issues/1501
651         if (inFail) {
652             return;
653         }
654 
655         try {
656             inFail = true;
657             for (;;) {
658                 if (!remove0(cause, notify)) {
659                     break;
660                 }
661             }
662         } finally {
663             inFail = false;
664         }
665     }
666 
667     void close(final Throwable cause, final boolean allowChannelOpen) {
668         if (inFail) {
669             channel.eventLoop().execute(new Runnable() {
670                 @Override
671                 public void run() {
672                     close(cause, allowChannelOpen);
673                 }
674             });
675             return;
676         }
677 
678         inFail = true;
679 
680         if (!allowChannelOpen && channel.isOpen()) {
681             throw new IllegalStateException("close() must be invoked after the channel is closed.");
682         }
683 
684         if (!isEmpty()) {
685             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
686         }
687 
688         // Release all unflushed messages.
689         try {
690             Entry e = unflushedEntry;
691             while (e != null) {
692                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
693                 int size = e.pendingSize;
694                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
695 
696                 if (!e.cancelled) {
697                     ReferenceCountUtil.safeRelease(e.msg);
698                     safeFail(e.promise, cause);
699                 }
700                 e = e.recycleAndGetNext();
701             }
702         } finally {
703             inFail = false;
704         }
705         clearNioBuffers();
706     }
707 
708     void close(ClosedChannelException cause) {
709         close(cause, false);
710     }
711 
712     private static void safeSuccess(ChannelPromise promise) {
713         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
714         // false.
715         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
716     }
717 
718     private static void safeFail(ChannelPromise promise, Throwable cause) {
719         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
720         // false.
721         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
722     }
723 
724     @Deprecated
725     public void recycle() {
726         // NOOP
727     }
728 
729     public long totalPendingWriteBytes() {
730         return totalPendingSize;
731     }
732 
733     /**
734      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
735      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
736      */
737     public long bytesBeforeUnwritable() {
738         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
739         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
740         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
741         // together. totalPendingSize will be updated before isWritable().
742         if (bytes > 0) {
743             return isWritable() ? bytes : 0;
744         }
745         return 0;
746     }
747 
748     /**
749      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
750      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
751      */
752     public long bytesBeforeWritable() {
753         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
754         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
755         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
756         // together. totalPendingSize will be updated before isWritable().
757         if (bytes > 0) {
758             return isWritable() ? 0 : bytes;
759         }
760         return 0;
761     }
762 
763     /**
764      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
765      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
766      * returns {@code false} or there are no more flushed messages to process.
767      */
768     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
769         if (processor == null) {
770             throw new NullPointerException("processor");
771         }
772 
773         Entry entry = flushedEntry;
774         if (entry == null) {
775             return;
776         }
777 
778         do {
779             if (!entry.cancelled) {
780                 if (!processor.processMessage(entry.msg)) {
781                     return;
782                 }
783             }
784             entry = entry.next;
785         } while (isFlushedEntry(entry));
786     }
787 
788     private boolean isFlushedEntry(Entry e) {
789         return e != null && e != unflushedEntry;
790     }
791 
792     public interface MessageProcessor {
793         /**
794          * Will be called for each flushed message until it either there are no more flushed messages or this
795          * method returns {@code false}.
796          */
797         boolean processMessage(Object msg) throws Exception;
798     }
799 
800     static final class Entry {
801         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
802             @Override
803             protected Entry newObject(Handle<Entry> handle) {
804                 return new Entry(handle);
805             }
806         };
807 
808         private final Handle<Entry> handle;
809         Entry next;
810         Object msg;
811         ByteBuffer[] bufs;
812         ByteBuffer buf;
813         ChannelPromise promise;
814         long progress;
815         long total;
816         int pendingSize;
817         int count = -1;
818         boolean cancelled;
819 
820         private Entry(Handle<Entry> handle) {
821             this.handle = handle;
822         }
823 
824         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
825             Entry entry = RECYCLER.get();
826             entry.msg = msg;
827             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
828             entry.total = total;
829             entry.promise = promise;
830             return entry;
831         }
832 
833         int cancel() {
834             if (!cancelled) {
835                 cancelled = true;
836                 int pSize = pendingSize;
837 
838                 // release message and replace with an empty buffer
839                 ReferenceCountUtil.safeRelease(msg);
840                 msg = Unpooled.EMPTY_BUFFER;
841 
842                 pendingSize = 0;
843                 total = 0;
844                 progress = 0;
845                 bufs = null;
846                 buf = null;
847                 return pSize;
848             }
849             return 0;
850         }
851 
852         void recycle() {
853             next = null;
854             bufs = null;
855             buf = null;
856             msg = null;
857             promise = null;
858             progress = 0;
859             total = 0;
860             pendingSize = 0;
861             count = -1;
862             cancelled = false;
863             handle.recycle(this);
864         }
865 
866         Entry recycleAndGetNext() {
867             Entry next = this.next;
868             recycle();
869             return next;
870         }
871     }
872 }