View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.socket.nio.NioSocketChannel;
22  import io.netty.util.Recycler;
23  import io.netty.util.Recycler.Handle;
24  import io.netty.util.ReferenceCountUtil;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.internal.InternalThreadLocalMap;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.PromiseNotificationUtil;
29  import io.netty.util.internal.SystemPropertyUtil;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ClosedChannelException;
35  import java.util.Arrays;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
38  
39  /**
40   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
41   * outbound write requests.
42   * <p>
43   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
44   * <ul>
45   * <li>{@link #size()} and {@link #isEmpty()}</li>
46   * <li>{@link #isWritable()}</li>
47   * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>
48   * </ul>
49   * </p>
50   */
51  public final class ChannelOutboundBuffer {
52      // Assuming a 64-bit JVM:
53      //  - 16 bytes object header
54      //  - 8 reference fields
55      //  - 2 long fields
56      //  - 2 int fields
57      //  - 1 boolean field
58      //  - padding
59      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
60              SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
61  
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
63  
64      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
65          @Override
66          protected ByteBuffer[] initialValue() throws Exception {
67              return new ByteBuffer[1024];
68          }
69      };
70  
71      private final Channel channel;
72  
73      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
74      //
75      // The Entry that is the first in the linked-list structure that was flushed
76      private Entry flushedEntry;
77      // The Entry which is the first unflushed in the linked-list structure
78      private Entry unflushedEntry;
79      // The Entry which represents the tail of the buffer
80      private Entry tailEntry;
81      // The number of flushed entries that are not written yet
82      private int flushed;
83  
84      private int nioBufferCount;
85      private long nioBufferSize;
86  
87      private boolean inFail;
88  
89      private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
90              AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
91  
92      @SuppressWarnings("UnusedDeclaration")
93      private volatile long totalPendingSize;
94  
95      private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
96              AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
97  
98      @SuppressWarnings("UnusedDeclaration")
99      private volatile int unwritable;
100 
101     private volatile Runnable fireChannelWritabilityChangedTask;
102 
103     ChannelOutboundBuffer(AbstractChannel channel) {
104         this.channel = channel;
105     }
106 
107     /**
108      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
109      * the message was written.
110      */
111     public void addMessage(Object msg, int size, ChannelPromise promise) {
112         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
113         if (tailEntry == null) {
114             flushedEntry = null;
115             tailEntry = entry;
116         } else {
117             Entry tail = tailEntry;
118             tail.next = entry;
119             tailEntry = entry;
120         }
121         if (unflushedEntry == null) {
122             unflushedEntry = entry;
123         }
124 
125         // increment pending bytes after adding message to the unflushed arrays.
126         // See https://github.com/netty/netty/issues/1619
127         incrementPendingOutboundBytes(entry.pendingSize, false);
128     }
129 
130     /**
131      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
132      * and so you will be able to handle them.
133      */
134     public void addFlush() {
135         // There is no need to process all entries if there was already a flush before and no new messages
136         // where added in the meantime.
137         //
138         // See https://github.com/netty/netty/issues/2577
139         Entry entry = unflushedEntry;
140         if (entry != null) {
141             if (flushedEntry == null) {
142                 // there is no flushedEntry yet, so start with the entry
143                 flushedEntry = entry;
144             }
145             do {
146                 flushed ++;
147                 if (!entry.promise.setUncancellable()) {
148                     // Was cancelled so make sure we free up memory and notify about the freed bytes
149                     int pending = entry.cancel();
150                     decrementPendingOutboundBytes(pending, false, true);
151                 }
152                 entry = entry.next;
153             } while (entry != null);
154 
155             // All flushed so reset unflushedEntry
156             unflushedEntry = null;
157         }
158     }
159 
160     /**
161      * Increment the pending bytes which will be written at some point.
162      * This method is thread-safe!
163      */
164     void incrementPendingOutboundBytes(long size) {
165         incrementPendingOutboundBytes(size, true);
166     }
167 
168     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
169         if (size == 0) {
170             return;
171         }
172 
173         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
174         if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
175             setUnwritable(invokeLater);
176         }
177     }
178 
179     /**
180      * Decrement the pending bytes which will be written at some point.
181      * This method is thread-safe!
182      */
183     void decrementPendingOutboundBytes(long size) {
184         decrementPendingOutboundBytes(size, true, true);
185     }
186 
187     private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
188         if (size == 0) {
189             return;
190         }
191 
192         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
193         if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
194             setWritable(invokeLater);
195         }
196     }
197 
198     private static long total(Object msg) {
199         if (msg instanceof ByteBuf) {
200             return ((ByteBuf) msg).readableBytes();
201         }
202         if (msg instanceof FileRegion) {
203             return ((FileRegion) msg).count();
204         }
205         if (msg instanceof ByteBufHolder) {
206             return ((ByteBufHolder) msg).content().readableBytes();
207         }
208         return -1;
209     }
210 
211     /**
212      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
213      */
214     public Object current() {
215         Entry entry = flushedEntry;
216         if (entry == null) {
217             return null;
218         }
219 
220         return entry.msg;
221     }
222 
223     /**
224      * Notify the {@link ChannelPromise} of the current message about writing progress.
225      */
226     public void progress(long amount) {
227         Entry e = flushedEntry;
228         assert e != null;
229         ChannelPromise p = e.promise;
230         if (p instanceof ChannelProgressivePromise) {
231             long progress = e.progress + amount;
232             e.progress = progress;
233             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
234         }
235     }
236 
237     /**
238      * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
239      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
240      * messages are ready to be handled.
241      */
242     public boolean remove() {
243         Entry e = flushedEntry;
244         if (e == null) {
245             clearNioBuffers();
246             return false;
247         }
248         Object msg = e.msg;
249 
250         ChannelPromise promise = e.promise;
251         int size = e.pendingSize;
252 
253         removeEntry(e);
254 
255         if (!e.cancelled) {
256             // only release message, notify and decrement if it was not canceled before.
257             ReferenceCountUtil.safeRelease(msg);
258             safeSuccess(promise);
259             decrementPendingOutboundBytes(size, false, true);
260         }
261 
262         // recycle the entry
263         e.recycle();
264 
265         return true;
266     }
267 
268     /**
269      * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
270      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
271      * {@code false} to signal that no more messages are ready to be handled.
272      */
273     public boolean remove(Throwable cause) {
274         return remove0(cause, true);
275     }
276 
277     private boolean remove0(Throwable cause, boolean notifyWritability) {
278         Entry e = flushedEntry;
279         if (e == null) {
280             clearNioBuffers();
281             return false;
282         }
283         Object msg = e.msg;
284 
285         ChannelPromise promise = e.promise;
286         int size = e.pendingSize;
287 
288         removeEntry(e);
289 
290         if (!e.cancelled) {
291             // only release message, fail and decrement if it was not canceled before.
292             ReferenceCountUtil.safeRelease(msg);
293 
294             safeFail(promise, cause);
295             decrementPendingOutboundBytes(size, false, notifyWritability);
296         }
297 
298         // recycle the entry
299         e.recycle();
300 
301         return true;
302     }
303 
304     private void removeEntry(Entry e) {
305         if (-- flushed == 0) {
306             // processed everything
307             flushedEntry = null;
308             if (e == tailEntry) {
309                 tailEntry = null;
310                 unflushedEntry = null;
311             }
312         } else {
313             flushedEntry = e.next;
314         }
315     }
316 
317     /**
318      * Removes the fully written entries and update the reader index of the partially written entry.
319      * This operation assumes all messages in this buffer is {@link ByteBuf}.
320      */
321     public void removeBytes(long writtenBytes) {
322         for (;;) {
323             Object msg = current();
324             if (!(msg instanceof ByteBuf)) {
325                 assert writtenBytes == 0;
326                 break;
327             }
328 
329             final ByteBuf buf = (ByteBuf) msg;
330             final int readerIndex = buf.readerIndex();
331             final int readableBytes = buf.writerIndex() - readerIndex;
332 
333             if (readableBytes <= writtenBytes) {
334                 if (writtenBytes != 0) {
335                     progress(readableBytes);
336                     writtenBytes -= readableBytes;
337                 }
338                 remove();
339             } else { // readableBytes > writtenBytes
340                 if (writtenBytes != 0) {
341                     buf.readerIndex(readerIndex + (int) writtenBytes);
342                     progress(writtenBytes);
343                 }
344                 break;
345             }
346         }
347         clearNioBuffers();
348     }
349 
350     // Clear all ByteBuffer from the array so these can be GC'ed.
351     // See https://github.com/netty/netty/issues/3837
352     private void clearNioBuffers() {
353         int count = nioBufferCount;
354         if (count > 0) {
355             nioBufferCount = 0;
356             Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
357         }
358     }
359 
360     /**
361      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
362      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
363      * array and the total number of readable bytes of the NIO buffers respectively.
364      * <p>
365      * Note that the returned array is reused and thus should not escape
366      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
367      * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
368      * </p>
369      */
370     public ByteBuffer[] nioBuffers() {
371         long nioBufferSize = 0;
372         int nioBufferCount = 0;
373         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
374         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
375         Entry entry = flushedEntry;
376         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
377             if (!entry.cancelled) {
378                 ByteBuf buf = (ByteBuf) entry.msg;
379                 final int readerIndex = buf.readerIndex();
380                 final int readableBytes = buf.writerIndex() - readerIndex;
381 
382                 if (readableBytes > 0) {
383                     if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {
384                         // If the nioBufferSize + readableBytes will overflow an Integer we stop populate the
385                         // ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then
386                         // Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will
387                         // raise an IOException. On Linux it may work depending on the
388                         // architecture and kernel but to be safe we also enforce the limit here.
389                         // This said writing more the Integer.MAX_VALUE is not a good idea anyway.
390                         //
391                         // See also:
392                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
393                         // - http://linux.die.net/man/2/writev
394                         break;
395                     }
396                     nioBufferSize += readableBytes;
397                     int count = entry.count;
398                     if (count == -1) {
399                         //noinspection ConstantValueVariableUse
400                         entry.count = count =  buf.nioBufferCount();
401                     }
402                     int neededSpace = nioBufferCount + count;
403                     if (neededSpace > nioBuffers.length) {
404                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
405                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
406                     }
407                     if (count == 1) {
408                         ByteBuffer nioBuf = entry.buf;
409                         if (nioBuf == null) {
410                             // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
411                             // derived buffer
412                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
413                         }
414                         nioBuffers[nioBufferCount ++] = nioBuf;
415                     } else {
416                         ByteBuffer[] nioBufs = entry.bufs;
417                         if (nioBufs == null) {
418                             // cached ByteBuffers as they may be expensive to create in terms
419                             // of Object allocation
420                             entry.bufs = nioBufs = buf.nioBuffers();
421                         }
422                         nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
423                     }
424                 }
425             }
426             entry = entry.next;
427         }
428         this.nioBufferCount = nioBufferCount;
429         this.nioBufferSize = nioBufferSize;
430 
431         return nioBuffers;
432     }
433 
434     private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
435         for (ByteBuffer nioBuf: nioBufs) {
436             if (nioBuf == null) {
437                 break;
438             }
439             nioBuffers[nioBufferCount ++] = nioBuf;
440         }
441         return nioBufferCount;
442     }
443 
444     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
445         int newCapacity = array.length;
446         do {
447             // double capacity until it is big enough
448             // See https://github.com/netty/netty/issues/1890
449             newCapacity <<= 1;
450 
451             if (newCapacity < 0) {
452                 throw new IllegalStateException();
453             }
454 
455         } while (neededSpace > newCapacity);
456 
457         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
458         System.arraycopy(array, 0, newArray, 0, size);
459 
460         return newArray;
461     }
462 
463     /**
464      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
465      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
466      * was called.
467      */
468     public int nioBufferCount() {
469         return nioBufferCount;
470     }
471 
472     /**
473      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
474      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
475      * was called.
476      */
477     public long nioBufferSize() {
478         return nioBufferSize;
479     }
480 
481     /**
482      * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
483      * not exceed the write watermark of the {@link Channel} and
484      * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
485      * {@code false}.
486      */
487     public boolean isWritable() {
488         return unwritable == 0;
489     }
490 
491     /**
492      * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
493      * {@code true}.
494      */
495     public boolean getUserDefinedWritability(int index) {
496         return (unwritable & writabilityMask(index)) == 0;
497     }
498 
499     /**
500      * Sets a user-defined writability flag at the specified index.
501      */
502     public void setUserDefinedWritability(int index, boolean writable) {
503         if (writable) {
504             setUserDefinedWritability(index);
505         } else {
506             clearUserDefinedWritability(index);
507         }
508     }
509 
510     private void setUserDefinedWritability(int index) {
511         final int mask = ~writabilityMask(index);
512         for (;;) {
513             final int oldValue = unwritable;
514             final int newValue = oldValue & mask;
515             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
516                 if (oldValue != 0 && newValue == 0) {
517                     fireChannelWritabilityChanged(true);
518                 }
519                 break;
520             }
521         }
522     }
523 
524     private void clearUserDefinedWritability(int index) {
525         final int mask = writabilityMask(index);
526         for (;;) {
527             final int oldValue = unwritable;
528             final int newValue = oldValue | mask;
529             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
530                 if (oldValue == 0 && newValue != 0) {
531                     fireChannelWritabilityChanged(true);
532                 }
533                 break;
534             }
535         }
536     }
537 
538     private static int writabilityMask(int index) {
539         if (index < 1 || index > 31) {
540             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
541         }
542         return 1 << index;
543     }
544 
545     private void setWritable(boolean invokeLater) {
546         for (;;) {
547             final int oldValue = unwritable;
548             final int newValue = oldValue & ~1;
549             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
550                 if (oldValue != 0 && newValue == 0) {
551                     fireChannelWritabilityChanged(invokeLater);
552                 }
553                 break;
554             }
555         }
556     }
557 
558     private void setUnwritable(boolean invokeLater) {
559         for (;;) {
560             final int oldValue = unwritable;
561             final int newValue = oldValue | 1;
562             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
563                 if (oldValue == 0 && newValue != 0) {
564                     fireChannelWritabilityChanged(invokeLater);
565                 }
566                 break;
567             }
568         }
569     }
570 
571     private void fireChannelWritabilityChanged(boolean invokeLater) {
572         final ChannelPipeline pipeline = channel.pipeline();
573         if (invokeLater) {
574             Runnable task = fireChannelWritabilityChangedTask;
575             if (task == null) {
576                 fireChannelWritabilityChangedTask = task = new Runnable() {
577                     @Override
578                     public void run() {
579                         pipeline.fireChannelWritabilityChanged();
580                     }
581                 };
582             }
583             channel.eventLoop().execute(task);
584         } else {
585             pipeline.fireChannelWritabilityChanged();
586         }
587     }
588 
589     /**
590      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
591      */
592     public int size() {
593         return flushed;
594     }
595 
596     /**
597      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
598      * otherwise.
599      */
600     public boolean isEmpty() {
601         return flushed == 0;
602     }
603 
604     void failFlushed(Throwable cause, boolean notify) {
605         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
606         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
607         // indirectly (usually by closing the channel.)
608         //
609         // See https://github.com/netty/netty/issues/1501
610         if (inFail) {
611             return;
612         }
613 
614         try {
615             inFail = true;
616             for (;;) {
617                 if (!remove0(cause, notify)) {
618                     break;
619                 }
620             }
621         } finally {
622             inFail = false;
623         }
624     }
625 
626     void close(final Throwable cause, final boolean allowChannelOpen) {
627         if (inFail) {
628             channel.eventLoop().execute(new Runnable() {
629                 @Override
630                 public void run() {
631                     close(cause, allowChannelOpen);
632                 }
633             });
634             return;
635         }
636 
637         inFail = true;
638 
639         if (!allowChannelOpen && channel.isOpen()) {
640             throw new IllegalStateException("close() must be invoked after the channel is closed.");
641         }
642 
643         if (!isEmpty()) {
644             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
645         }
646 
647         // Release all unflushed messages.
648         try {
649             Entry e = unflushedEntry;
650             while (e != null) {
651                 // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
652                 int size = e.pendingSize;
653                 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
654 
655                 if (!e.cancelled) {
656                     ReferenceCountUtil.safeRelease(e.msg);
657                     safeFail(e.promise, cause);
658                 }
659                 e = e.recycleAndGetNext();
660             }
661         } finally {
662             inFail = false;
663         }
664         clearNioBuffers();
665     }
666 
667     void close(ClosedChannelException cause) {
668         close(cause, false);
669     }
670 
671     private static void safeSuccess(ChannelPromise promise) {
672         // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
673         // false.
674         PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
675     }
676 
677     private static void safeFail(ChannelPromise promise, Throwable cause) {
678         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
679         // false.
680         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
681     }
682 
683     @Deprecated
684     public void recycle() {
685         // NOOP
686     }
687 
688     public long totalPendingWriteBytes() {
689         return totalPendingSize;
690     }
691 
692     /**
693      * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
694      * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
695      */
696     public long bytesBeforeUnwritable() {
697         long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
698         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
699         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
700         // together. totalPendingSize will be updated before isWritable().
701         if (bytes > 0) {
702             return isWritable() ? bytes : 0;
703         }
704         return 0;
705     }
706 
707     /**
708      * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
709      * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
710      */
711     public long bytesBeforeWritable() {
712         long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
713         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
714         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
715         // together. totalPendingSize will be updated before isWritable().
716         if (bytes > 0) {
717             return isWritable() ? 0 : bytes;
718         }
719         return 0;
720     }
721 
722     /**
723      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
724      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
725      * returns {@code false} or there are no more flushed messages to process.
726      */
727     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
728         if (processor == null) {
729             throw new NullPointerException("processor");
730         }
731 
732         Entry entry = flushedEntry;
733         if (entry == null) {
734             return;
735         }
736 
737         do {
738             if (!entry.cancelled) {
739                 if (!processor.processMessage(entry.msg)) {
740                     return;
741                 }
742             }
743             entry = entry.next;
744         } while (isFlushedEntry(entry));
745     }
746 
747     private boolean isFlushedEntry(Entry e) {
748         return e != null && e != unflushedEntry;
749     }
750 
751     public interface MessageProcessor {
752         /**
753          * Will be called for each flushed message until it either there are no more flushed messages or this
754          * method returns {@code false}.
755          */
756         boolean processMessage(Object msg) throws Exception;
757     }
758 
759     static final class Entry {
760         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
761             @Override
762             protected Entry newObject(Handle handle) {
763                 return new Entry(handle);
764             }
765         };
766 
767         private final Handle handle;
768         Entry next;
769         Object msg;
770         ByteBuffer[] bufs;
771         ByteBuffer buf;
772         ChannelPromise promise;
773         long progress;
774         long total;
775         int pendingSize;
776         int count = -1;
777         boolean cancelled;
778 
779         private Entry(Handle handle) {
780             this.handle = handle;
781         }
782 
783         static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
784             Entry entry = RECYCLER.get();
785             entry.msg = msg;
786             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
787             entry.total = total;
788             entry.promise = promise;
789             return entry;
790         }
791 
792         int cancel() {
793             if (!cancelled) {
794                 cancelled = true;
795                 int pSize = pendingSize;
796 
797                 // release message and replace with an empty buffer
798                 ReferenceCountUtil.safeRelease(msg);
799                 msg = Unpooled.EMPTY_BUFFER;
800 
801                 pendingSize = 0;
802                 total = 0;
803                 progress = 0;
804                 bufs = null;
805                 buf = null;
806                 return pSize;
807             }
808             return 0;
809         }
810 
811         void recycle() {
812             next = null;
813             bufs = null;
814             buf = null;
815             msg = null;
816             promise = null;
817             progress = 0;
818             total = 0;
819             pendingSize = 0;
820             count = -1;
821             cancelled = false;
822             RECYCLER.recycle(this, handle);
823         }
824 
825         Entry recycleAndGetNext() {
826             Entry next = this.next;
827             recycle();
828             return next;
829         }
830     }
831 }