View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.AbstractReferenceCountedByteBuf;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.socket.nio.NioSocketChannel;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.ReferenceCountUtil;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.internal.InternalThreadLocalMap;
27  import io.netty.util.internal.ObjectPool;
28  import io.netty.util.internal.ObjectPool.Handle;
29  import io.netty.util.internal.ObjectPool.ObjectCreator;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PromiseNotificationUtil;
32  import io.netty.util.internal.SystemPropertyUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.nio.ByteBuffer;
37  import java.nio.channels.ClosedChannelException;
38  import java.util.Arrays;
39  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
41  
42  import static java.lang.Math.min;
43  
44  /**
45   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
46   * outbound write requests.
47   * <p>
48   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
49   * <ul>
50   * <li>{@link #isWritable()}</li>
51   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
52   * </ul>
53   * </p>
54   */
55  public final class ChannelOutboundBuffer {
56      // Assuming a 64-bit JVM:
57      //  - 16 bytes object header
58      //  - 6 reference fields
59      //  - 2 long fields
60      //  - 2 int fields
61      //  - 1 boolean field
62      //  - padding
63      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
64              SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
65  
66      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
67  
68      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
69          @Override
70          protected ByteBuffer[] initialValue() throws Exception {
71              return new ByteBuffer[1024];
72          }
73      };
74  
75      private final Channel channel;
76  
77      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
78      //
79      // The Entry that is the first in the linked-list structure that was flushed
80      private Entry flushedEntry;
81      // The Entry which is the first unflushed in the linked-list structure
82      private Entry unflushedEntry;
83      // The Entry which represents the tail of the buffer
84      private Entry tailEntry;
85      // The number of flushed entries that are not written yet
86      private int flushed;
87  
88      private int nioBufferCount;
89      private long nioBufferSize;
90  
91      private boolean inFail;
92  
93      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
94              AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
95  
96      @SuppressWarnings("UnusedDeclaration")
97      private volatile long totalPendingSize;
98  
99      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
100             AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
101 
102     @SuppressWarnings("UnusedDeclaration")
103     private volatile int unwritable;
104 
105     private volatile Runnable fireChannelWritabilityChangedTask;
106 
107     ChannelOutboundBuffer(AbstractChannel channel) {
108         this.channel = channel;
109     }
110 
111     /**
112      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
113      * the message was written.
114      */
115     public void addMessage(Object msg, int size, ChannelPromise promise) {
116         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
117         if (tailEntry == null) {
118             flushedEntry = null;
119         } else {
120             Entry tail = tailEntry;
121             tail.next = entry;
122         }
123         tailEntry = entry;
124         if (unflushedEntry == null) {
125             unflushedEntry = entry;
126         }
127 
128         // Touch the message to make it easier to debug buffer leaks.
129 
130         // this save both checking against the ReferenceCounted interface
131         // and makes better use of virtual calls vs interface ones
132         if (msg instanceof AbstractReferenceCountedByteBuf) {
133             ((AbstractReferenceCountedByteBuf) msg).touch();
134         } else {
135             ReferenceCountUtil.touch(msg);
136         }
137 
138         // increment pending bytes after adding message to the unflushed arrays.
139         // See https://github.com/netty/netty/issues/1619
140         incrementPendingOutboundBytes(entry.pendingSize, false);
141     }
142 
143     /**
144      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
145      * and so you will be able to handle them.
146      */
147     public void addFlush() {
148         // There is no need to process all entries if there was already a flush before and no new messages
149         // where added in the meantime.
150         //
151         // See https://github.com/netty/netty/issues/2577
152         Entry entry = unflushedEntry;
153         if (entry != null) {
154             if (flushedEntry == null) {
155                 // there is no flushedEntry yet, so start with the entry
156                 flushedEntry = entry;
157             }
158             do {
159                 flushed ++;
160                 if (!entry.promise.setUncancellable()) {
161                     // Was cancelled so make sure we free up memory and notify about the freed bytes
162                     int pending = entry.cancel();
163                     decrementPendingOutboundBytes(pending, false, true);
164                 }
165                 entry = entry.next;
166             } while (entry != null);
167 
168             // All flushed so reset unflushedEntry
169             unflushedEntry = null;
170         }
171     }
172 
173     /**
174      * Increment the pending bytes which will be written at some point.
175      * This method is thread-safe!
176      */
177     void incrementPendingOutboundBytes(long size) {
178         incrementPendingOutboundBytes(size, true);
179     }
180 
181     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
182         if (size == 0) {
183             return;
184         }
185 
186         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
187         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
188             setUnwritable(invokeLater);
189         }
190     }
191 
192     /**
193      * Decrement the pending bytes which will be written at some point.
194      * This method is thread-safe!
195      */
196     void decrementPendingOutboundBytes(long size) {
197         decrementPendingOutboundBytes(size, true, true);
198     }
199 
200     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
201         if (size == 0) {
202             return;
203         }
204 
205         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
206         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
207             setWritable(invokeLater);
208         }
209     }
210 
211     private static long total(Object msg) {
212         if (msg instanceof ByteBuf) {
213             return ((ByteBuf) msg).readableBytes();
214         }
215         if (msg instanceof FileRegion) {
216             return ((FileRegion) msg).count();
217         }
218         if (msg instanceof ByteBufHolder) {
219             return ((ByteBufHolder) msg).content().readableBytes();
220         }
221         return -1;
222     }
223 
224     /**
225      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
226      */
227     public Object current() {
228         Entry entry = flushedEntry;
229         if (entry == null) {
230             return null;
231         }
232 
233         return entry.msg;
234     }
235 
236     /**
237      * Return the current message flush progress.
238      * @return {@code 0} if nothing was flushed before for the current message or there is no current message
239      */
240     public long currentProgress() {
241         Entry entry = flushedEntry;
242         if (entry == null) {
243             return 0;
244         }
245         return entry.progress;
246     }
247 
248     /**
249      * Notify the {@link ChannelPromise} of the current message about writing progress.
250      */
251     public void progress(long amount) {
252         Entry e = flushedEntry;
253         assert e != null;
254         ChannelPromise p = e.promise;
255         long progress = e.progress + amount;
256         e.progress = progress;
257         assert p != null;
258         final Class<?> promiseClass = p.getClass();
259         // fast-path to save O(n) ChannelProgressivePromise's type check on OpenJDK
260         if (promiseClass == VoidChannelPromise.class || promiseClass == DefaultChannelPromise.class) {
261             return;
262         }
263         // this is going to save from type pollution due to https://bugs.openjdk.org/browse/JDK-8180450
264         if (p instanceof DefaultChannelProgressivePromise) {
265             ((DefaultChannelProgressivePromise) p).tryProgress(progress, e.total);
266         } else if (p instanceof ChannelProgressivePromise) {
267             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
268         }
269     }
270 
271     /**
272      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
273      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
274      * messages are ready to be handled.
275      */
276     public boolean remove() {
277         Entry e = flushedEntry;
278         if (e == null) {
279             clearNioBuffers();
280             return false;
281         }
282         Object msg = e.msg;
283 
284         ChannelPromise promise = e.promise;
285         int size = e.pendingSize;
286 
287         removeEntry(e);
288 
289         // only release message, notify and decrement if it was not canceled before.
290         if (!e.cancelled) {
291             // this save both checking against the ReferenceCounted interface
292             // and makes better use of virtual calls vs interface ones
293             if (msg instanceof AbstractReferenceCountedByteBuf) {
294                 try {
295                     // release now as it is flushed.
296                     ((AbstractReferenceCountedByteBuf) msg).release();
297                 } catch (Throwable t) {
298                     logger.warn("Failed to release a ByteBuf: {}", msg, t);
299                 }
300             } else {
301                 ReferenceCountUtil.safeRelease(msg);
302             }
303             safeSuccess(promise);
304             decrementPendingOutboundBytes(size, false, true);
305         }
306 
307         // recycle the entry
308         e.unguardedRecycle();
309 
310         return true;
311     }
312 
313     /**
314      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
315      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
316      * {@code false} to signal that no more messages are ready to be handled.
317      */
318     public boolean remove(Throwable cause) {
319         return remove0(cause, true);
320     }
321 
322     private boolean remove0(Throwable cause, boolean notifyWritability) {
323         Entry e = flushedEntry;
324         if (e == null) {
325             clearNioBuffers();
326             return false;
327         }
328         Object msg = e.msg;
329 
330         ChannelPromise promise = e.promise;
331         int size = e.pendingSize;
332 
333         removeEntry(e);
334 
335         if (!e.cancelled) {
336             // only release message, fail and decrement if it was not canceled before.
337             ReferenceCountUtil.safeRelease(msg);
338 
339             safeFail(promise, cause);
340             decrementPendingOutboundBytes(size, false, notifyWritability);
341         }
342 
343         // recycle the entry
344         e.unguardedRecycle();
345 
346         return true;
347     }
348 
349     private void removeEntry(Entry e) {
350         if (-- flushed == 0) {
351             // processed everything
352             flushedEntry = null;
353             if (e == tailEntry) {
354                 tailEntry = null;
355                 unflushedEntry = null;
356             }
357         } else {
358             flushedEntry = e.next;
359         }
360     }
361 
362     /**
363      * Removes the fully written entries and update the reader index of the partially written entry.
364      * This operation assumes all messages in this buffer is {@link ByteBuf}.
365      */
366     public void removeBytes(long writtenBytes) {
367         for (;;) {
368             Object msg = current();
369             if (!(msg instanceof ByteBuf)) {
370                 assert writtenBytes == 0;
371                 break;
372             }
373 
374             final ByteBuf buf = (ByteBuf) msg;
375             final int readerIndex = buf.readerIndex();
376             final int readableBytes = buf.writerIndex() - readerIndex;
377 
378             if (readableBytes <= writtenBytes) {
379                 if (writtenBytes != 0) {
380                     progress(readableBytes);
381                     writtenBytes -= readableBytes;
382                 }
383                 remove();
384             } else { // readableBytes > writtenBytes
385                 if (writtenBytes != 0) {
386                     buf.readerIndex(readerIndex + (int) writtenBytes);
387                     progress(writtenBytes);
388                 }
389                 break;
390             }
391         }
392         clearNioBuffers();
393     }
394 
395     // Clear all ByteBuffer from the array so these can be GC'ed.
396     // See https://github.com/netty/netty/issues/3837
397     private void clearNioBuffers() {
398         int count = nioBufferCount;
399         if (count > 0) {
400             nioBufferCount = 0;
401             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
402         }
403     }
404 
405     /**
406      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
407      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
408      * array and the total number of readable bytes of the NIO buffers respectively.
409      * <p>
410      * Note that the returned array is reused and thus should not escape
411      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
412      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
413      * </p>
414      */
415     public ByteBuffer[] nioBuffers() {
416         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
417     }
418 
419     /**
420      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
421      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
422      * array and the total number of readable bytes of the NIO buffers respectively.
423      * <p>
424      * Note that the returned array is reused and thus should not escape
425      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
426      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
427      * </p>
428      * @param maxCount The maximum amount of buffers that will be added to the return value.
429      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
430      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
431      *                 in the return value to ensure write progress is made.
432      */
433     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
434         assert maxCount > 0;
435         assert maxBytes > 0;
436         long nioBufferSize = 0;
437         int nioBufferCount = 0;
438         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
439         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
440         Entry entry = flushedEntry;
441         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
442             if (!entry.cancelled) {
443                 ByteBuf buf = (ByteBuf) entry.msg;
444                 final int readerIndex = buf.readerIndex();
445                 final int readableBytes = buf.writerIndex() - readerIndex;
446 
447                 if (readableBytes > 0) {
448                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
449                         // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
450                         // we stop populate the ByteBuffer array. This is done for 2 reasons:
451                         // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
452                         // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
453                         // on the architecture and kernel but to be safe we also enforce the limit here.
454                         // 2. There is no sense in putting more data in the array than is likely to be accepted by the
455                         // OS.
456                         //
457                         // See also:
458                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
459                         // - https://linux.die.net//man/2/writev
460                         break;
461                     }
462                     nioBufferSize += readableBytes;
463                     int count = entry.count;
464                     if (count == -1) {
465                         //noinspection ConstantValueVariableUse
466                         entry.count = count = buf.nioBufferCount();
467                     }
468                     int neededSpace = min(maxCount, nioBufferCount + count);
469                     if (neededSpace > nioBuffers.length) {
470                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
471                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
472                     }
473                     if (count == 1) {
474                         ByteBuffer nioBuf = entry.buf;
475                         if (nioBuf == null) {
476                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
477                             // derived buffer
478                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
479                         }
480                         nioBuffers[nioBufferCount++] = nioBuf;
481                     } else {
482                         // The code exists in an extra method to ensure the method is not too big to inline as this
483                         // branch is not very likely to get hit very frequently.
484                         nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
485                     }
486                     if (nioBufferCount >= maxCount) {
487                         break;
488                     }
489                 }
490             }
491             entry = entry.next;
492         }
493         this.nioBufferCount = nioBufferCount;
494         this.nioBufferSize = nioBufferSize;
495 
496         return nioBuffers;
497     }
498 
499     private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
500         ByteBuffer[] nioBufs = entry.bufs;
501         if (nioBufs == null) {
502             // cached ByteBuffers as they may be expensive to create in terms
503             // of Object allocation
504             entry.bufs = nioBufs = buf.nioBuffers();
505         }
506         for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
507             ByteBuffer nioBuf = nioBufs[i];
508             if (nioBuf == null) {
509                 break;
510             } else if (!nioBuf.hasRemaining()) {
511                 continue;
512             }
513             nioBuffers[nioBufferCount++] = nioBuf;
514         }
515         return nioBufferCount;
516     }
517 
518     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
519         int newCapacity = array.length;
520         do {
521             // double capacity until it is big enough
522             // See https://github.com/netty/netty/issues/1890
523             newCapacity <<= 1;
524 
525             if (newCapacity < 0) {
526                 throw new IllegalStateException();
527             }
528 
529         } while (neededSpace > newCapacity);
530 
531         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
532         System.arraycopy(array, 0, newArray, 0, size);
533 
534         return newArray;
535     }
536 
537     /**
538      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
539      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
540      * was called.
541      */
542     public int nioBufferCount() {
543         return nioBufferCount;
544     }
545 
546     /**
547      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
548      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
549      * was called.
550      */
551     public long nioBufferSize() {
552         return nioBufferSize;
553     }
554 
555     /**
556      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
557      * not exceed the write watermark of the {@link Channel} and
558      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
559      * {@code false}.
560      */
561     public boolean isWritable() {
562         return unwritable == 0;
563     }
564 
565     /**
566      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
567      * {@code true}.
568      */
569     public boolean getUserDefinedWritability(int index) {
570         return (unwritable & writabilityMask(index)) == 0;
571     }
572 
573     /**
574      * Sets a user-defined writability flag at the specified index.
575      */
576     public void setUserDefinedWritability(int index, boolean writable) {
577         if (writable) {
578             setUserDefinedWritability(index);
579         } else {
580             clearUserDefinedWritability(index);
581         }
582     }
583 
584     private void setUserDefinedWritability(int index) {
585         final int mask = ~writabilityMask(index);
586         for (;;) {
587             final int oldValue = unwritable;
588             final int newValue = oldValue & mask;
589             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
590                 if (oldValue != 0 && newValue == 0) {
591                     fireChannelWritabilityChanged(true);
592                 }
593                 break;
594             }
595         }
596     }
597 
598     private void clearUserDefinedWritability(int index) {
599         final int mask = writabilityMask(index);
600         for (;;) {
601             final int oldValue = unwritable;
602             final int newValue = oldValue | mask;
603             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
604                 if (oldValue == 0 && newValue != 0) {
605                     fireChannelWritabilityChanged(true);
606                 }
607                 break;
608             }
609         }
610     }
611 
612     private static int writabilityMask(int index) {
613         if (index < 1 || index > 31) {
614             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
615         }
616         return 1 << index;
617     }
618 
619     private void setWritable(boolean invokeLater) {
620         for (;;) {
621             final int oldValue = unwritable;
622             final int newValue = oldValue & ~1;
623             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
624                 if (oldValue != 0 && newValue == 0) {
625                     fireChannelWritabilityChanged(invokeLater);
626                 }
627                 break;
628             }
629         }
630     }
631 
632     private void setUnwritable(boolean invokeLater) {
633         for (;;) {
634             final int oldValue = unwritable;
635             final int newValue = oldValue | 1;
636             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
637                 if (oldValue == 0) {
638                     fireChannelWritabilityChanged(invokeLater);
639                 }
640                 break;
641             }
642         }
643     }
644 
645     private void fireChannelWritabilityChanged(boolean invokeLater) {
646         final ChannelPipeline pipeline = channel.pipeline();
647         if (invokeLater) {
648             Runnable task = fireChannelWritabilityChangedTask;
649             if (task == null) {
650                 fireChannelWritabilityChangedTask = task = new Runnable() {
651                     @Override
652                     public void run() {
653                         pipeline.fireChannelWritabilityChanged();
654                     }
655                 };
656             }
657             channel.eventLoop().execute(task);
658         } else {
659             pipeline.fireChannelWritabilityChanged();
660         }
661     }
662 
663     /**
664      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
665      */
666     public int size() {
667         return flushed;
668     }
669 
670     /**
671      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
672      * otherwise.
673      */
674     public boolean isEmpty() {
675         return flushed == 0;
676     }
677 
678     void failFlushed(Throwable cause, boolean notify) {
679         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
680         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
681         // indirectly (usually by closing the channel.)
682         //
683         // See https://github.com/netty/netty/issues/1501
684         if (inFail) {
685             return;
686         }
687 
688         try {
689             inFail = true;
690             for (;;) {
691                 if (!remove0(cause, notify)) {
692                     break;
693                 }
694             }
695         } finally {
696             inFail = false;
697         }
698     }
699 
700     void close(final Throwable cause, final boolean allowChannelOpen) {
701         if (inFail) {
702             channel.eventLoop().execute(new Runnable() {
703                 @Override
704                 public void run() {
705                     close(cause, allowChannelOpen);
706                 }
707             });
708             return;
709         }
710 
711         inFail = true;
712 
713         if (!allowChannelOpen && channel.isOpen()) {
714             throw new IllegalStateException("close() must be invoked after the channel is closed.");
715         }
716 
717         if (!isEmpty()) {
718             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
719         }
720 
721         // Release all unflushed messages.
722         try {
723             Entry e = unflushedEntry;
724             while (e != null) {
725                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
726                 int size = e.pendingSize;
727                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
728 
729                 if (!e.cancelled) {
730                     ReferenceCountUtil.safeRelease(e.msg);
731                     safeFail(e.promise, cause);
732                 }
733                 e = e.unguardedRecycleAndGetNext();
734             }
735         } finally {
736             inFail = false;
737         }
738         clearNioBuffers();
739     }
740 
741     void close(ClosedChannelException cause) {
742         close(cause, false);
743     }
744 
745     private static void safeSuccess(ChannelPromise promise) {
746         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
747         // false.
748         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
749     }
750 
751     private static void safeFail(ChannelPromise promise, Throwable cause) {
752         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
753         // false.
754         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
755     }
756 
757     @Deprecated
758     public void recycle() {
759         // NOOP
760     }
761 
762     public long totalPendingWriteBytes() {
763         return totalPendingSize;
764     }
765 
766     /**
767      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
768      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
769      */
770     public long bytesBeforeUnwritable() {
771         // +1 because writability doesn't change until the threshold is crossed (not equal to).
772         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
773         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
774         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
775         // together. totalPendingSize will be updated before isWritable().
776         return bytes > 0 && isWritable() ? bytes : 0;
777     }
778 
779     /**
780      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
781      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
782      */
783     public long bytesBeforeWritable() {
784         // +1 because writability doesn't change until the threshold is crossed (not equal to).
785         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark() + 1;
786         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
787         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
788         // together. totalPendingSize will be updated before isWritable().
789         return bytes <= 0 || isWritable() ? 0 : bytes;
790     }
791 
792     /**
793      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
794      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
795      * returns {@code false} or there are no more flushed messages to process.
796      */
797     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
798         ObjectUtil.checkNotNull(processor, "processor");
799 
800         Entry entry = flushedEntry;
801         if (entry == null) {
802             return;
803         }
804 
805         do {
806             if (!entry.cancelled) {
807                 if (!processor.processMessage(entry.msg)) {
808                     return;
809                 }
810             }
811             entry = entry.next;
812         } while (isFlushedEntry(entry));
813     }
814 
815     private boolean isFlushedEntry(Entry e) {
816         return e != null && e != unflushedEntry;
817     }
818 
819     public interface MessageProcessor {
820         /**
821          * Will be called for each flushed message until it either there are no more flushed messages or this
822          * method returns {@code false}.
823          */
824         boolean processMessage(Object msg) throws Exception;
825     }
826 
827     static final class Entry {
828         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
829             @Override
830             public Entry newObject(Handle<Entry> handle) {
831                 return new Entry(handle);
832             }
833         });
834 
835         private final EnhancedHandle<Entry> handle;
836         Entry next;
837         Object msg;
838         ByteBuffer[] bufs;
839         ByteBuffer buf;
840         ChannelPromise promise;
841         long progress;
842         long total;
843         int pendingSize;
844         int count = -1;
845         boolean cancelled;
846 
847         private Entry(Handle<Entry> handle) {
848             this.handle = (EnhancedHandle<Entry>) handle;
849         }
850 
851         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
852             Entry entry = RECYCLER.get();
853             entry.msg = msg;
854             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
855             entry.total = total;
856             entry.promise = promise;
857             return entry;
858         }
859 
860         int cancel() {
861             if (!cancelled) {
862                 cancelled = true;
863                 int pSize = pendingSize;
864 
865                 // release message and replace with an empty buffer
866                 ReferenceCountUtil.safeRelease(msg);
867                 msg = Unpooled.EMPTY_BUFFER;
868 
869                 pendingSize = 0;
870                 total = 0;
871                 progress = 0;
872                 bufs = null;
873                 buf = null;
874                 return pSize;
875             }
876             return 0;
877         }
878 
879         void unguardedRecycle() {
880             next = null;
881             bufs = null;
882             buf = null;
883             msg = null;
884             promise = null;
885             progress = 0;
886             total = 0;
887             pendingSize = 0;
888             count = -1;
889             cancelled = false;
890             handle.unguardedRecycle(this);
891         }
892 
893         Entry unguardedRecycleAndGetNext() {
894             Entry next = this.next;
895             unguardedRecycle();
896             return next;
897         }
898     }
899 }