View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.socket.nio.NioSocketChannel;
22  import io.netty.util.Recycler;
23  import io.netty.util.Recycler.Handle;
24  import io.netty.util.ReferenceCountUtil;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.internal.InternalThreadLocalMap;
27  import io.netty.util.internal.PromiseNotificationUtil;
28  import io.netty.util.internal.SystemPropertyUtil;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.util.Arrays;
35  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
36  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
37  
38  import static java.lang.Math.min;
39  
40  /**
41   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
42   * outbound write requests.
43   * <p>
44   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
45   * <ul>
46   * <li>{@link #size()} and {@link #isEmpty()}</li>
47   * <li>{@link #isWritable()}</li>
48   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
49   * </ul>
50   * </p>
51   */
52  public final class ChannelOutboundBuffer {
53      // Assuming a 64-bit JVM:
54      //  - 16 bytes object header
55      //  - 8 reference fields
56      //  - 2 long fields
57      //  - 2 int fields
58      //  - 1 boolean field
59      //  - padding
60      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
61              SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
62  
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
64  
65      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
66          @Override
67          protected ByteBuffer[] initialValue() throws Exception {
68              return new ByteBuffer[1024];
69          }
70      };
71  
72      private final Channel channel;
73  
74      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
75      //
76      // The Entry that is the first in the linked-list structure that was flushed
77      private Entry flushedEntry;
78      // The Entry which is the first unflushed in the linked-list structure
79      private Entry unflushedEntry;
80      // The Entry which represents the tail of the buffer
81      private Entry tailEntry;
82      // The number of flushed entries that are not written yet
83      private int flushed;
84  
85      private int nioBufferCount;
86      private long nioBufferSize;
87  
88      private boolean inFail;
89  
90      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
91              AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
92  
93      @SuppressWarnings("UnusedDeclaration")
94      private volatile long totalPendingSize;
95  
96      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
97              AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
98  
99      @SuppressWarnings("UnusedDeclaration")
100     private volatile int unwritable;
101 
102     private volatile Runnable fireChannelWritabilityChangedTask;
103 
104     ChannelOutboundBuffer(AbstractChannel channel) {
105         this.channel = channel;
106     }
107 
108     /**
109      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
110      * the message was written.
111      */
112     public void addMessage(Object msg, int size, ChannelPromise promise) {
113         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
114         if (tailEntry == null) {
115             flushedEntry = null;
116             tailEntry = entry;
117         } else {
118             Entry tail = tailEntry;
119             tail.next = entry;
120             tailEntry = entry;
121         }
122         if (unflushedEntry == null) {
123             unflushedEntry = entry;
124         }
125 
126         // increment pending bytes after adding message to the unflushed arrays.
127         // See https://github.com/netty/netty/issues/1619
128         incrementPendingOutboundBytes(entry.pendingSize, false);
129     }
130 
131     /**
132      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
133      * and so you will be able to handle them.
134      */
135     public void addFlush() {
136         // There is no need to process all entries if there was already a flush before and no new messages
137         // where added in the meantime.
138         //
139         // See https://github.com/netty/netty/issues/2577
140         Entry entry = unflushedEntry;
141         if (entry != null) {
142             if (flushedEntry == null) {
143                 // there is no flushedEntry yet, so start with the entry
144                 flushedEntry = entry;
145             }
146             do {
147                 flushed ++;
148                 if (!entry.promise.setUncancellable()) {
149                     // Was cancelled so make sure we free up memory and notify about the freed bytes
150                     int pending = entry.cancel();
151                     decrementPendingOutboundBytes(pending, false, true);
152                 }
153                 entry = entry.next;
154             } while (entry != null);
155 
156             // All flushed so reset unflushedEntry
157             unflushedEntry = null;
158         }
159     }
160 
161     /**
162      * Increment the pending bytes which will be written at some point.
163      * This method is thread-safe!
164      */
165     void incrementPendingOutboundBytes(long size) {
166         incrementPendingOutboundBytes(size, true);
167     }
168 
169     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
170         if (size == 0) {
171             return;
172         }
173 
174         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
175         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
176             setUnwritable(invokeLater);
177         }
178     }
179 
180     /**
181      * Decrement the pending bytes which will be written at some point.
182      * This method is thread-safe!
183      */
184     void decrementPendingOutboundBytes(long size) {
185         decrementPendingOutboundBytes(size, true, true);
186     }
187 
188     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
189         if (size == 0) {
190             return;
191         }
192 
193         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
194         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
195             setWritable(invokeLater);
196         }
197     }
198 
199     private static long total(Object msg) {
200         if (msg instanceof ByteBuf) {
201             return ((ByteBuf) msg).readableBytes();
202         }
203         if (msg instanceof FileRegion) {
204             return ((FileRegion) msg).count();
205         }
206         if (msg instanceof ByteBufHolder) {
207             return ((ByteBufHolder) msg).content().readableBytes();
208         }
209         return -1;
210     }
211 
212     /**
213      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
214      */
215     public Object current() {
216         Entry entry = flushedEntry;
217         if (entry == null) {
218             return null;
219         }
220 
221         return entry.msg;
222     }
223 
224     /**
225      * Notify the {@link ChannelPromise} of the current message about writing progress.
226      */
227     public void progress(long amount) {
228         Entry e = flushedEntry;
229         assert e != null;
230         ChannelPromise p = e.promise;
231         if (p instanceof ChannelProgressivePromise) {
232             long progress = e.progress + amount;
233             e.progress = progress;
234             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
235         }
236     }
237 
238     /**
239      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
240      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
241      * messages are ready to be handled.
242      */
243     public boolean remove() {
244         Entry e = flushedEntry;
245         if (e == null) {
246             clearNioBuffers();
247             return false;
248         }
249         Object msg = e.msg;
250 
251         ChannelPromise promise = e.promise;
252         int size = e.pendingSize;
253 
254         removeEntry(e);
255 
256         if (!e.cancelled) {
257             // only release message, notify and decrement if it was not canceled before.
258             ReferenceCountUtil.safeRelease(msg);
259             safeSuccess(promise);
260             decrementPendingOutboundBytes(size, false, true);
261         }
262 
263         // recycle the entry
264         e.recycle();
265 
266         return true;
267     }
268 
269     /**
270      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
271      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
272      * {@code false} to signal that no more messages are ready to be handled.
273      */
274     public boolean remove(Throwable cause) {
275         return remove0(cause, true);
276     }
277 
278     private boolean remove0(Throwable cause, boolean notifyWritability) {
279         Entry e = flushedEntry;
280         if (e == null) {
281             clearNioBuffers();
282             return false;
283         }
284         Object msg = e.msg;
285 
286         ChannelPromise promise = e.promise;
287         int size = e.pendingSize;
288 
289         removeEntry(e);
290 
291         if (!e.cancelled) {
292             // only release message, fail and decrement if it was not canceled before.
293             ReferenceCountUtil.safeRelease(msg);
294 
295             safeFail(promise, cause);
296             decrementPendingOutboundBytes(size, false, notifyWritability);
297         }
298 
299         // recycle the entry
300         e.recycle();
301 
302         return true;
303     }
304 
305     private void removeEntry(Entry e) {
306         if (-- flushed == 0) {
307             // processed everything
308             flushedEntry = null;
309             if (e == tailEntry) {
310                 tailEntry = null;
311                 unflushedEntry = null;
312             }
313         } else {
314             flushedEntry = e.next;
315         }
316     }
317 
318     /**
319      * Removes the fully written entries and update the reader index of the partially written entry.
320      * This operation assumes all messages in this buffer is {@link ByteBuf}.
321      */
322     public void removeBytes(long writtenBytes) {
323         for (;;) {
324             Object msg = current();
325             if (!(msg instanceof ByteBuf)) {
326                 assert writtenBytes == 0;
327                 break;
328             }
329 
330             final ByteBuf buf = (ByteBuf) msg;
331             final int readerIndex = buf.readerIndex();
332             final int readableBytes = buf.writerIndex() - readerIndex;
333 
334             if (readableBytes <= writtenBytes) {
335                 if (writtenBytes != 0) {
336                     progress(readableBytes);
337                     writtenBytes -= readableBytes;
338                 }
339                 remove();
340             } else { // readableBytes > writtenBytes
341                 if (writtenBytes != 0) {
342                     buf.readerIndex(readerIndex + (int) writtenBytes);
343                     progress(writtenBytes);
344                 }
345                 break;
346             }
347         }
348         clearNioBuffers();
349     }
350 
351     // Clear all ByteBuffer from the array so these can be GC'ed.
352     // See https://github.com/netty/netty/issues/3837
353     private void clearNioBuffers() {
354         int count = nioBufferCount;
355         if (count > 0) {
356             nioBufferCount = 0;
357             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
358         }
359     }
360 
361     /**
362      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
363      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
364      * array and the total number of readable bytes of the NIO buffers respectively.
365      * <p>
366      * Note that the returned array is reused and thus should not escape
367      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
368      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
369      * </p>
370      */
371     public ByteBuffer[] nioBuffers() {
372         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
373     }
374 
375     /**
376      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
377      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
378      * array and the total number of readable bytes of the NIO buffers respectively.
379      * <p>
380      * Note that the returned array is reused and thus should not escape
381      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
382      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
383      * </p>
384      * @param maxCount The maximum amount of buffers that will be added to the return value.
385      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
386      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
387      *                 in the return value to ensure write progress is made.
388      */
389     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
390         assert maxCount > 0;
391         assert maxBytes > 0;
392         long nioBufferSize = 0;
393         int nioBufferCount = 0;
394         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
395         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
396         Entry entry = flushedEntry;
397         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
398             if (!entry.cancelled) {
399                 ByteBuf buf = (ByteBuf) entry.msg;
400                 final int readerIndex = buf.readerIndex();
401                 final int readableBytes = buf.writerIndex() - readerIndex;
402 
403                 if (readableBytes > 0) {
404                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
405                         // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
406                         // we stop populate the ByteBuffer array. This is done for 2 reasons:
407                         // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
408                         // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
409                         // on the architecture and kernel but to be safe we also enforce the limit here.
410                         // 2. There is no sense in putting more data in the array than is likely to be accepted by the
411                         // OS.
412                         //
413                         // See also:
414                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
415                         // - http://linux.die.net/man/2/writev
416                         break;
417                     }
418                     nioBufferSize += readableBytes;
419                     int count = entry.count;
420                     if (count == -1) {
421                         //noinspection ConstantValueVariableUse
422                         entry.count = count = buf.nioBufferCount();
423                     }
424                     int neededSpace = min(maxCount, nioBufferCount + count);
425                     if (neededSpace > nioBuffers.length) {
426                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
427                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
428                     }
429                     if (count == 1) {
430                         ByteBuffer nioBuf = entry.buf;
431                         if (nioBuf == null) {
432                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
433                             // derived buffer
434                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
435                         }
436                         nioBuffers[nioBufferCount++] = nioBuf;
437                     } else {
438                         ByteBuffer[] nioBufs = entry.bufs;
439                         if (nioBufs == null) {
440                             // cached ByteBuffers as they may be expensive to create in terms
441                             // of Object allocation
442                             entry.bufs = nioBufs = buf.nioBuffers();
443                         }
444                         for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
445                             ByteBuffer nioBuf = nioBufs[i];
446                             if (nioBuf == null) {
447                                 break;
448                             } else if (!nioBuf.hasRemaining()) {
449                                 continue;
450                             }
451                             nioBuffers[nioBufferCount++] = nioBuf;
452                         }
453                     }
454                     if (nioBufferCount == maxCount) {
455                         break;
456                     }
457                 }
458             }
459             entry = entry.next;
460         }
461         this.nioBufferCount = nioBufferCount;
462         this.nioBufferSize = nioBufferSize;
463 
464         return nioBuffers;
465     }
466 
467     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
468         int newCapacity = array.length;
469         do {
470             // double capacity until it is big enough
471             // See https://github.com/netty/netty/issues/1890
472             newCapacity <<= 1;
473 
474             if (newCapacity < 0) {
475                 throw new IllegalStateException();
476             }
477 
478         } while (neededSpace > newCapacity);
479 
480         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
481         System.arraycopy(array, 0, newArray, 0, size);
482 
483         return newArray;
484     }
485 
486     /**
487      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
488      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
489      * was called.
490      */
491     public int nioBufferCount() {
492         return nioBufferCount;
493     }
494 
495     /**
496      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
497      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
498      * was called.
499      */
500     public long nioBufferSize() {
501         return nioBufferSize;
502     }
503 
504     /**
505      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
506      * not exceed the write watermark of the {@link Channel} and
507      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
508      * {@code false}.
509      */
510     public boolean isWritable() {
511         return unwritable == 0;
512     }
513 
514     /**
515      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
516      * {@code true}.
517      */
518     public boolean getUserDefinedWritability(int index) {
519         return (unwritable & writabilityMask(index)) == 0;
520     }
521 
522     /**
523      * Sets a user-defined writability flag at the specified index.
524      */
525     public void setUserDefinedWritability(int index, boolean writable) {
526         if (writable) {
527             setUserDefinedWritability(index);
528         } else {
529             clearUserDefinedWritability(index);
530         }
531     }
532 
533     private void setUserDefinedWritability(int index) {
534         final int mask = ~writabilityMask(index);
535         for (;;) {
536             final int oldValue = unwritable;
537             final int newValue = oldValue & mask;
538             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
539                 if (oldValue != 0 && newValue == 0) {
540                     fireChannelWritabilityChanged(true);
541                 }
542                 break;
543             }
544         }
545     }
546 
547     private void clearUserDefinedWritability(int index) {
548         final int mask = writabilityMask(index);
549         for (;;) {
550             final int oldValue = unwritable;
551             final int newValue = oldValue | mask;
552             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
553                 if (oldValue == 0 && newValue != 0) {
554                     fireChannelWritabilityChanged(true);
555                 }
556                 break;
557             }
558         }
559     }
560 
561     private static int writabilityMask(int index) {
562         if (index < 1 || index > 31) {
563             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
564         }
565         return 1 << index;
566     }
567 
568     private void setWritable(boolean invokeLater) {
569         for (;;) {
570             final int oldValue = unwritable;
571             final int newValue = oldValue & ~1;
572             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
573                 if (oldValue != 0 && newValue == 0) {
574                     fireChannelWritabilityChanged(invokeLater);
575                 }
576                 break;
577             }
578         }
579     }
580 
581     private void setUnwritable(boolean invokeLater) {
582         for (;;) {
583             final int oldValue = unwritable;
584             final int newValue = oldValue | 1;
585             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
586                 if (oldValue == 0 && newValue != 0) {
587                     fireChannelWritabilityChanged(invokeLater);
588                 }
589                 break;
590             }
591         }
592     }
593 
594     private void fireChannelWritabilityChanged(boolean invokeLater) {
595         final ChannelPipeline pipeline = channel.pipeline();
596         if (invokeLater) {
597             Runnable task = fireChannelWritabilityChangedTask;
598             if (task == null) {
599                 fireChannelWritabilityChangedTask = task = new Runnable() {
600                     @Override
601                     public void run() {
602                         pipeline.fireChannelWritabilityChanged();
603                     }
604                 };
605             }
606             channel.eventLoop().execute(task);
607         } else {
608             pipeline.fireChannelWritabilityChanged();
609         }
610     }
611 
612     /**
613      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
614      */
615     public int size() {
616         return flushed;
617     }
618 
619     /**
620      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
621      * otherwise.
622      */
623     public boolean isEmpty() {
624         return flushed == 0;
625     }
626 
627     void failFlushed(Throwable cause, boolean notify) {
628         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
629         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
630         // indirectly (usually by closing the channel.)
631         //
632         // See https://github.com/netty/netty/issues/1501
633         if (inFail) {
634             return;
635         }
636 
637         try {
638             inFail = true;
639             for (;;) {
640                 if (!remove0(cause, notify)) {
641                     break;
642                 }
643             }
644         } finally {
645             inFail = false;
646         }
647     }
648 
649     void close(final Throwable cause, final boolean allowChannelOpen) {
650         if (inFail) {
651             channel.eventLoop().execute(new Runnable() {
652                 @Override
653                 public void run() {
654                     close(cause, allowChannelOpen);
655                 }
656             });
657             return;
658         }
659 
660         inFail = true;
661 
662         if (!allowChannelOpen && channel.isOpen()) {
663             throw new IllegalStateException("close() must be invoked after the channel is closed.");
664         }
665 
666         if (!isEmpty()) {
667             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
668         }
669 
670         // Release all unflushed messages.
671         try {
672             Entry e = unflushedEntry;
673             while (e != null) {
674                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
675                 int size = e.pendingSize;
676                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
677 
678                 if (!e.cancelled) {
679                     ReferenceCountUtil.safeRelease(e.msg);
680                     safeFail(e.promise, cause);
681                 }
682                 e = e.recycleAndGetNext();
683             }
684         } finally {
685             inFail = false;
686         }
687         clearNioBuffers();
688     }
689 
690     void close(ClosedChannelException cause) {
691         close(cause, false);
692     }
693 
694     private static void safeSuccess(ChannelPromise promise) {
695         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
696         // false.
697         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
698     }
699 
700     private static void safeFail(ChannelPromise promise, Throwable cause) {
701         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
702         // false.
703         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
704     }
705 
706     @Deprecated
707     public void recycle() {
708         // NOOP
709     }
710 
711     public long totalPendingWriteBytes() {
712         return totalPendingSize;
713     }
714 
715     /**
716      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
717      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
718      */
719     public long bytesBeforeUnwritable() {
720         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
721         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
722         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
723         // together. totalPendingSize will be updated before isWritable().
724         if (bytes > 0) {
725             return isWritable() ? bytes : 0;
726         }
727         return 0;
728     }
729 
730     /**
731      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
732      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
733      */
734     public long bytesBeforeWritable() {
735         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
736         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
737         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
738         // together. totalPendingSize will be updated before isWritable().
739         if (bytes > 0) {
740             return isWritable() ? 0 : bytes;
741         }
742         return 0;
743     }
744 
745     /**
746      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
747      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
748      * returns {@code false} or there are no more flushed messages to process.
749      */
750     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
751         if (processor == null) {
752             throw new NullPointerException("processor");
753         }
754 
755         Entry entry = flushedEntry;
756         if (entry == null) {
757             return;
758         }
759 
760         do {
761             if (!entry.cancelled) {
762                 if (!processor.processMessage(entry.msg)) {
763                     return;
764                 }
765             }
766             entry = entry.next;
767         } while (isFlushedEntry(entry));
768     }
769 
770     private boolean isFlushedEntry(Entry e) {
771         return e != null && e != unflushedEntry;
772     }
773 
774     public interface MessageProcessor {
775         /**
776          * Will be called for each flushed message until it either there are no more flushed messages or this
777          * method returns {@code false}.
778          */
779         boolean processMessage(Object msg) throws Exception;
780     }
781 
782     static final class Entry {
783         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
784             @Override
785             protected Entry newObject(Handle<Entry> handle) {
786                 return new Entry(handle);
787             }
788         };
789 
790         private final Handle<Entry> handle;
791         Entry next;
792         Object msg;
793         ByteBuffer[] bufs;
794         ByteBuffer buf;
795         ChannelPromise promise;
796         long progress;
797         long total;
798         int pendingSize;
799         int count = -1;
800         boolean cancelled;
801 
802         private Entry(Handle<Entry> handle) {
803             this.handle = handle;
804         }
805 
806         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
807             Entry entry = RECYCLER.get();
808             entry.msg = msg;
809             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
810             entry.total = total;
811             entry.promise = promise;
812             return entry;
813         }
814 
815         int cancel() {
816             if (!cancelled) {
817                 cancelled = true;
818                 int pSize = pendingSize;
819 
820                 // release message and replace with an empty buffer
821                 ReferenceCountUtil.safeRelease(msg);
822                 msg = Unpooled.EMPTY_BUFFER;
823 
824                 pendingSize = 0;
825                 total = 0;
826                 progress = 0;
827                 bufs = null;
828                 buf = null;
829                 return pSize;
830             }
831             return 0;
832         }
833 
834         void recycle() {
835             next = null;
836             bufs = null;
837             buf = null;
838             msg = null;
839             promise = null;
840             progress = 0;
841             total = 0;
842             pendingSize = 0;
843             count = -1;
844             cancelled = false;
845             handle.recycle(this);
846         }
847 
848         Entry recycleAndGetNext() {
849             Entry next = this.next;
850             recycle();
851             return next;
852         }
853     }
854 }