View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.AbstractReferenceCountedByteBuf;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.socket.nio.NioSocketChannel;
23  import io.netty.util.Recycler;
24  import io.netty.util.Recycler.EnhancedHandle;
25  import io.netty.util.ReferenceCountUtil;
26  import io.netty.util.concurrent.FastThreadLocal;
27  import io.netty.util.internal.InternalThreadLocalMap;
28  import io.netty.util.internal.ObjectPool.Handle;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PromiseNotificationUtil;
31  import io.netty.util.internal.SystemPropertyUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.nio.ByteBuffer;
36  import java.nio.channels.ClosedChannelException;
37  import java.util.Arrays;
38  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
40  
41  import static java.lang.Math.min;
42  
43  /**
44   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
45   * outbound write requests.
46   * <p>
47   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
48   * <ul>
49   * <li>{@link #isWritable()}</li>
50   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
51   * </ul>
52   * </p>
53   */
54  public final class ChannelOutboundBuffer {
55      // Assuming a 64-bit JVM:
56      //  - 16 bytes object header
57      //  - 6 reference fields
58      //  - 2 long fields
59      //  - 2 int fields
60      //  - 1 boolean field
61      //  - padding
62      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
63              SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
64  
65      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
66  
67      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
68          @Override
69          protected ByteBuffer[] initialValue() throws Exception {
70              return new ByteBuffer[1024];
71          }
72      };
73  
74      private final Channel channel;
75  
76      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
77      //
78      // The Entry that is the first in the linked-list structure that was flushed
79      private Entry flushedEntry;
80      // The Entry which is the first unflushed in the linked-list structure
81      private Entry unflushedEntry;
82      // The Entry which represents the tail of the buffer
83      private Entry tailEntry;
84      // The number of flushed entries that are not written yet
85      private int flushed;
86  
87      private int nioBufferCount;
88      private long nioBufferSize;
89  
90      private boolean inFail;
91  
92      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
93              AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
94  
95      @SuppressWarnings("UnusedDeclaration")
96      private volatile long totalPendingSize;
97  
98      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
99              AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
100 
101     @SuppressWarnings("UnusedDeclaration")
102     private volatile int unwritable;
103 
104     private volatile Runnable fireChannelWritabilityChangedTask;
105 
106     ChannelOutboundBuffer(AbstractChannel channel) {
107         this.channel = channel;
108     }
109 
110     /**
111      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
112      * the message was written.
113      */
114     public void addMessage(Object msg, int size, ChannelPromise promise) {
115         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
116         if (tailEntry == null) {
117             flushedEntry = null;
118         } else {
119             Entry tail = tailEntry;
120             tail.next = entry;
121         }
122         tailEntry = entry;
123         if (unflushedEntry == null) {
124             unflushedEntry = entry;
125         }
126 
127         // Touch the message to make it easier to debug buffer leaks.
128 
129         // this save both checking against the ReferenceCounted interface
130         // and makes better use of virtual calls vs interface ones
131         if (msg instanceof AbstractReferenceCountedByteBuf) {
132             ((AbstractReferenceCountedByteBuf) msg).touch();
133         } else {
134             ReferenceCountUtil.touch(msg);
135         }
136 
137         // increment pending bytes after adding message to the unflushed arrays.
138         // See https://github.com/netty/netty/issues/1619
139         incrementPendingOutboundBytes(entry.pendingSize, false);
140     }
141 
142     /**
143      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
144      * and so you will be able to handle them.
145      */
146     public void addFlush() {
147         // There is no need to process all entries if there was already a flush before and no new messages
148         // where added in the meantime.
149         //
150         // See https://github.com/netty/netty/issues/2577
151         Entry entry = unflushedEntry;
152         if (entry != null) {
153             if (flushedEntry == null) {
154                 // there is no flushedEntry yet, so start with the entry
155                 flushedEntry = entry;
156             }
157             do {
158                 flushed ++;
159                 if (!entry.promise.setUncancellable()) {
160                     // Was cancelled so make sure we free up memory and notify about the freed bytes
161                     int pending = entry.cancel();
162                     decrementPendingOutboundBytes(pending, false, true);
163                 }
164                 entry = entry.next;
165             } while (entry != null);
166 
167             // All flushed so reset unflushedEntry
168             unflushedEntry = null;
169         }
170     }
171 
172     /**
173      * Increment the pending bytes which will be written at some point.
174      * This method is thread-safe!
175      */
176     void incrementPendingOutboundBytes(long size) {
177         incrementPendingOutboundBytes(size, true);
178     }
179 
180     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
181         if (size == 0) {
182             return;
183         }
184 
185         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
186         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
187             setUnwritable(invokeLater);
188         }
189     }
190 
191     /**
192      * Decrement the pending bytes which will be written at some point.
193      * This method is thread-safe!
194      */
195     void decrementPendingOutboundBytes(long size) {
196         decrementPendingOutboundBytes(size, true, true);
197     }
198 
199     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
200         if (size == 0) {
201             return;
202         }
203 
204         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
205         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
206             setWritable(invokeLater);
207         }
208     }
209 
210     private static long total(Object msg) {
211         if (msg instanceof ByteBuf) {
212             return ((ByteBuf) msg).readableBytes();
213         }
214         if (msg instanceof FileRegion) {
215             return ((FileRegion) msg).count();
216         }
217         if (msg instanceof ByteBufHolder) {
218             return ((ByteBufHolder) msg).content().readableBytes();
219         }
220         return -1;
221     }
222 
223     /**
224      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
225      */
226     public Object current() {
227         Entry entry = flushedEntry;
228         if (entry == null) {
229             return null;
230         }
231 
232         return entry.msg;
233     }
234 
235     /**
236      * Return the current message flush progress.
237      * @return {@code 0} if nothing was flushed before for the current message or there is no current message
238      */
239     public long currentProgress() {
240         Entry entry = flushedEntry;
241         if (entry == null) {
242             return 0;
243         }
244         return entry.progress;
245     }
246 
247     /**
248      * Notify the {@link ChannelPromise} of the current message about writing progress.
249      */
250     public void progress(long amount) {
251         Entry e = flushedEntry;
252         assert e != null;
253         ChannelPromise p = e.promise;
254         long progress = e.progress + amount;
255         e.progress = progress;
256         assert p != null;
257         final Class<?> promiseClass = p.getClass();
258         // fast-path to save O(n) ChannelProgressivePromise's type check on OpenJDK
259         if (promiseClass == VoidChannelPromise.class || promiseClass == DefaultChannelPromise.class) {
260             return;
261         }
262         // this is going to save from type pollution due to https://bugs.openjdk.org/browse/JDK-8180450
263         if (p instanceof DefaultChannelProgressivePromise) {
264             ((DefaultChannelProgressivePromise) p).tryProgress(progress, e.total);
265         } else if (p instanceof ChannelProgressivePromise) {
266             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
267         }
268     }
269 
270     /**
271      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
272      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
273      * messages are ready to be handled.
274      */
275     public boolean remove() {
276         Entry e = flushedEntry;
277         if (e == null) {
278             clearNioBuffers();
279             return false;
280         }
281         Object msg = e.msg;
282 
283         ChannelPromise promise = e.promise;
284         int size = e.pendingSize;
285 
286         removeEntry(e);
287 
288         // only release message, notify and decrement if it was not canceled before.
289         if (!e.cancelled) {
290             // this save both checking against the ReferenceCounted interface
291             // and makes better use of virtual calls vs interface ones
292             if (msg instanceof AbstractReferenceCountedByteBuf) {
293                 try {
294                     // release now as it is flushed.
295                     ((AbstractReferenceCountedByteBuf) msg).release();
296                 } catch (Throwable t) {
297                     logger.warn("Failed to release a ByteBuf: {}", msg, t);
298                 }
299             } else {
300                 ReferenceCountUtil.safeRelease(msg);
301             }
302             safeSuccess(promise);
303             decrementPendingOutboundBytes(size, false, true);
304         }
305 
306         // recycle the entry
307         e.unguardedRecycle();
308 
309         return true;
310     }
311 
312     /**
313      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
314      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
315      * {@code false} to signal that no more messages are ready to be handled.
316      */
317     public boolean remove(Throwable cause) {
318         return remove0(cause, true);
319     }
320 
321     private boolean remove0(Throwable cause, boolean notifyWritability) {
322         Entry e = flushedEntry;
323         if (e == null) {
324             clearNioBuffers();
325             return false;
326         }
327         Object msg = e.msg;
328 
329         ChannelPromise promise = e.promise;
330         int size = e.pendingSize;
331 
332         removeEntry(e);
333 
334         if (!e.cancelled) {
335             // only release message, fail and decrement if it was not canceled before.
336             ReferenceCountUtil.safeRelease(msg);
337 
338             safeFail(promise, cause);
339             decrementPendingOutboundBytes(size, false, notifyWritability);
340         }
341 
342         // recycle the entry
343         e.unguardedRecycle();
344 
345         return true;
346     }
347 
348     private void removeEntry(Entry e) {
349         if (-- flushed == 0) {
350             // processed everything
351             flushedEntry = null;
352             if (e == tailEntry) {
353                 tailEntry = null;
354                 unflushedEntry = null;
355             }
356         } else {
357             flushedEntry = e.next;
358         }
359     }
360 
361     /**
362      * Removes the fully written entries and update the reader index of the partially written entry.
363      * This operation assumes all messages in this buffer is {@link ByteBuf}.
364      */
365     public void removeBytes(long writtenBytes) {
366         for (;;) {
367             Object msg = current();
368             if (!(msg instanceof ByteBuf)) {
369                 assert writtenBytes == 0;
370                 break;
371             }
372 
373             final ByteBuf buf = (ByteBuf) msg;
374             final int readerIndex = buf.readerIndex();
375             final int readableBytes = buf.writerIndex() - readerIndex;
376 
377             if (readableBytes <= writtenBytes) {
378                 if (writtenBytes != 0) {
379                     progress(readableBytes);
380                     writtenBytes -= readableBytes;
381                 }
382                 remove();
383             } else { // readableBytes > writtenBytes
384                 if (writtenBytes != 0) {
385                     buf.readerIndex(readerIndex + (int) writtenBytes);
386                     progress(writtenBytes);
387                 }
388                 break;
389             }
390         }
391         clearNioBuffers();
392     }
393 
394     // Clear all ByteBuffer from the array so these can be GC'ed.
395     // See https://github.com/netty/netty/issues/3837
396     private void clearNioBuffers() {
397         int count = nioBufferCount;
398         if (count > 0) {
399             nioBufferCount = 0;
400             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
401         }
402     }
403 
404     /**
405      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
406      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
407      * array and the total number of readable bytes of the NIO buffers respectively.
408      * <p>
409      * Note that the returned array is reused and thus should not escape
410      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
411      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
412      * </p>
413      */
414     public ByteBuffer[] nioBuffers() {
415         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
416     }
417 
418     /**
419      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
420      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
421      * array and the total number of readable bytes of the NIO buffers respectively.
422      * <p>
423      * Note that the returned array is reused and thus should not escape
424      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
425      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
426      * </p>
427      * @param maxCount The maximum amount of buffers that will be added to the return value.
428      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
429      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
430      *                 in the return value to ensure write progress is made.
431      */
432     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
433         assert maxCount > 0;
434         assert maxBytes > 0;
435         long nioBufferSize = 0;
436         int nioBufferCount = 0;
437         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
438         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
439         Entry entry = flushedEntry;
440         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
441             if (!entry.cancelled) {
442                 ByteBuf buf = (ByteBuf) entry.msg;
443                 final int readerIndex = buf.readerIndex();
444                 final int readableBytes = buf.writerIndex() - readerIndex;
445 
446                 if (readableBytes > 0) {
447                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
448                         // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
449                         // we stop populate the ByteBuffer array. This is done for 2 reasons:
450                         // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
451                         // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
452                         // on the architecture and kernel but to be safe we also enforce the limit here.
453                         // 2. There is no sense in putting more data in the array than is likely to be accepted by the
454                         // OS.
455                         //
456                         // See also:
457                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
458                         // - https://linux.die.net//man/2/writev
459                         break;
460                     }
461                     nioBufferSize += readableBytes;
462                     int count = entry.count;
463                     if (count == -1) {
464                         //noinspection ConstantValueVariableUse
465                         entry.count = count = buf.nioBufferCount();
466                     }
467                     int neededSpace = min(maxCount, nioBufferCount + count);
468                     if (neededSpace > nioBuffers.length) {
469                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
470                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
471                     }
472                     if (count == 1) {
473                         ByteBuffer nioBuf = entry.buf;
474                         if (nioBuf == null) {
475                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
476                             // derived buffer
477                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
478                         }
479                         nioBuffers[nioBufferCount++] = nioBuf;
480                     } else {
481                         // The code exists in an extra method to ensure the method is not too big to inline as this
482                         // branch is not very likely to get hit very frequently.
483                         nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
484                     }
485                     if (nioBufferCount >= maxCount) {
486                         break;
487                     }
488                 }
489             }
490             entry = entry.next;
491         }
492         this.nioBufferCount = nioBufferCount;
493         this.nioBufferSize = nioBufferSize;
494 
495         return nioBuffers;
496     }
497 
498     private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
499         ByteBuffer[] nioBufs = entry.bufs;
500         if (nioBufs == null) {
501             // cached ByteBuffers as they may be expensive to create in terms
502             // of Object allocation
503             entry.bufs = nioBufs = buf.nioBuffers();
504         }
505         for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
506             ByteBuffer nioBuf = nioBufs[i];
507             if (nioBuf == null) {
508                 break;
509             } else if (!nioBuf.hasRemaining()) {
510                 continue;
511             }
512             nioBuffers[nioBufferCount++] = nioBuf;
513         }
514         return nioBufferCount;
515     }
516 
517     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
518         int newCapacity = array.length;
519         do {
520             // double capacity until it is big enough
521             // See https://github.com/netty/netty/issues/1890
522             newCapacity <<= 1;
523 
524             if (newCapacity < 0) {
525                 throw new IllegalStateException();
526             }
527 
528         } while (neededSpace > newCapacity);
529 
530         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
531         System.arraycopy(array, 0, newArray, 0, size);
532 
533         return newArray;
534     }
535 
536     /**
537      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
538      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
539      * was called.
540      */
541     public int nioBufferCount() {
542         return nioBufferCount;
543     }
544 
545     /**
546      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
547      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
548      * was called.
549      */
550     public long nioBufferSize() {
551         return nioBufferSize;
552     }
553 
554     /**
555      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
556      * not exceed the write watermark of the {@link Channel} and
557      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
558      * {@code false}.
559      */
560     public boolean isWritable() {
561         return unwritable == 0;
562     }
563 
564     /**
565      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
566      * {@code true}.
567      */
568     public boolean getUserDefinedWritability(int index) {
569         return (unwritable & writabilityMask(index)) == 0;
570     }
571 
572     /**
573      * Sets a user-defined writability flag at the specified index.
574      */
575     public void setUserDefinedWritability(int index, boolean writable) {
576         if (writable) {
577             setUserDefinedWritability(index);
578         } else {
579             clearUserDefinedWritability(index);
580         }
581     }
582 
583     private void setUserDefinedWritability(int index) {
584         final int mask = ~writabilityMask(index);
585         for (;;) {
586             final int oldValue = unwritable;
587             final int newValue = oldValue & mask;
588             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
589                 if (oldValue != 0 && newValue == 0) {
590                     fireChannelWritabilityChanged(true);
591                 }
592                 break;
593             }
594         }
595     }
596 
597     private void clearUserDefinedWritability(int index) {
598         final int mask = writabilityMask(index);
599         for (;;) {
600             final int oldValue = unwritable;
601             final int newValue = oldValue | mask;
602             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
603                 if (oldValue == 0 && newValue != 0) {
604                     fireChannelWritabilityChanged(true);
605                 }
606                 break;
607             }
608         }
609     }
610 
611     private static int writabilityMask(int index) {
612         if (index < 1 || index > 31) {
613             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
614         }
615         return 1 << index;
616     }
617 
618     private void setWritable(boolean invokeLater) {
619         for (;;) {
620             final int oldValue = unwritable;
621             final int newValue = oldValue & ~1;
622             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
623                 if (oldValue != 0 && newValue == 0) {
624                     fireChannelWritabilityChanged(invokeLater);
625                 }
626                 break;
627             }
628         }
629     }
630 
631     private void setUnwritable(boolean invokeLater) {
632         for (;;) {
633             final int oldValue = unwritable;
634             final int newValue = oldValue | 1;
635             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
636                 if (oldValue == 0) {
637                     fireChannelWritabilityChanged(invokeLater);
638                 }
639                 break;
640             }
641         }
642     }
643 
644     private void fireChannelWritabilityChanged(boolean invokeLater) {
645         final ChannelPipeline pipeline = channel.pipeline();
646         if (invokeLater) {
647             Runnable task = fireChannelWritabilityChangedTask;
648             if (task == null) {
649                 fireChannelWritabilityChangedTask = task = new Runnable() {
650                     @Override
651                     public void run() {
652                         pipeline.fireChannelWritabilityChanged();
653                     }
654                 };
655             }
656             channel.eventLoop().execute(task);
657         } else {
658             pipeline.fireChannelWritabilityChanged();
659         }
660     }
661 
662     /**
663      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
664      */
665     public int size() {
666         return flushed;
667     }
668 
669     /**
670      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
671      * otherwise.
672      */
673     public boolean isEmpty() {
674         return flushed == 0;
675     }
676 
677     void failFlushed(Throwable cause, boolean notify) {
678         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
679         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
680         // indirectly (usually by closing the channel.)
681         //
682         // See https://github.com/netty/netty/issues/1501
683         if (inFail) {
684             return;
685         }
686 
687         try {
688             inFail = true;
689             for (;;) {
690                 if (!remove0(cause, notify)) {
691                     break;
692                 }
693             }
694         } finally {
695             inFail = false;
696         }
697     }
698 
699     void close(final Throwable cause, final boolean allowChannelOpen) {
700         if (inFail) {
701             channel.eventLoop().execute(new Runnable() {
702                 @Override
703                 public void run() {
704                     close(cause, allowChannelOpen);
705                 }
706             });
707             return;
708         }
709 
710         inFail = true;
711 
712         if (!allowChannelOpen && channel.isOpen()) {
713             throw new IllegalStateException("close() must be invoked after the channel is closed.");
714         }
715 
716         if (!isEmpty()) {
717             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
718         }
719 
720         // Release all unflushed messages.
721         try {
722             Entry e = unflushedEntry;
723             while (e != null) {
724                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
725                 int size = e.pendingSize;
726                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
727 
728                 if (!e.cancelled) {
729                     ReferenceCountUtil.safeRelease(e.msg);
730                     safeFail(e.promise, cause);
731                 }
732                 e = e.unguardedRecycleAndGetNext();
733             }
734         } finally {
735             inFail = false;
736         }
737         clearNioBuffers();
738     }
739 
740     void close(ClosedChannelException cause) {
741         close(cause, false);
742     }
743 
744     private static void safeSuccess(ChannelPromise promise) {
745         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
746         // false.
747         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
748     }
749 
750     private static void safeFail(ChannelPromise promise, Throwable cause) {
751         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
752         // false.
753         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
754     }
755 
756     @Deprecated
757     public void recycle() {
758         // NOOP
759     }
760 
761     public long totalPendingWriteBytes() {
762         return totalPendingSize;
763     }
764 
765     /**
766      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
767      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
768      */
769     public long bytesBeforeUnwritable() {
770         // +1 because writability doesn't change until the threshold is crossed (not equal to).
771         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
772         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
773         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
774         // together. totalPendingSize will be updated before isWritable().
775         return bytes > 0 && isWritable() ? bytes : 0;
776     }
777 
778     /**
779      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
780      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
781      */
782     public long bytesBeforeWritable() {
783         // +1 because writability doesn't change until the threshold is crossed (not equal to).
784         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark() + 1;
785         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
786         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
787         // together. totalPendingSize will be updated before isWritable().
788         return bytes <= 0 || isWritable() ? 0 : bytes;
789     }
790 
791     /**
792      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
793      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
794      * returns {@code false} or there are no more flushed messages to process.
795      */
796     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
797         ObjectUtil.checkNotNull(processor, "processor");
798 
799         Entry entry = flushedEntry;
800         if (entry == null) {
801             return;
802         }
803 
804         do {
805             if (!entry.cancelled) {
806                 if (!processor.processMessage(entry.msg)) {
807                     return;
808                 }
809             }
810             entry = entry.next;
811         } while (isFlushedEntry(entry));
812     }
813 
814     private boolean isFlushedEntry(Entry e) {
815         return e != null && e != unflushedEntry;
816     }
817 
818     public interface MessageProcessor {
819         /**
820          * Will be called for each flushed message until it either there are no more flushed messages or this
821          * method returns {@code false}.
822          */
823         boolean processMessage(Object msg) throws Exception;
824     }
825 
826     static final class Entry {
827         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
828             @Override
829             protected Entry newObject(Handle<Entry> handle) {
830                 return new Entry(handle);
831             }
832         };
833 
834         private final EnhancedHandle<Entry> handle;
835         Entry next;
836         Object msg;
837         ByteBuffer[] bufs;
838         ByteBuffer buf;
839         ChannelPromise promise;
840         long progress;
841         long total;
842         int pendingSize;
843         int count = -1;
844         boolean cancelled;
845 
846         private Entry(Handle<Entry> handle) {
847             this.handle = (EnhancedHandle<Entry>) handle;
848         }
849 
850         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
851             Entry entry = RECYCLER.get();
852             entry.msg = msg;
853             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
854             entry.total = total;
855             entry.promise = promise;
856             return entry;
857         }
858 
859         int cancel() {
860             if (!cancelled) {
861                 cancelled = true;
862                 int pSize = pendingSize;
863 
864                 // release message and replace with an empty buffer
865                 ReferenceCountUtil.safeRelease(msg);
866                 msg = Unpooled.EMPTY_BUFFER;
867 
868                 pendingSize = 0;
869                 total = 0;
870                 progress = 0;
871                 bufs = null;
872                 buf = null;
873                 return pSize;
874             }
875             return 0;
876         }
877 
878         void unguardedRecycle() {
879             next = null;
880             bufs = null;
881             buf = null;
882             msg = null;
883             promise = null;
884             progress = 0;
885             total = 0;
886             pendingSize = 0;
887             count = -1;
888             cancelled = false;
889             handle.unguardedRecycle(this);
890         }
891 
892         Entry unguardedRecycleAndGetNext() {
893             Entry next = this.next;
894             unguardedRecycle();
895             return next;
896         }
897     }
898 }