View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.util.concurrent.EventExecutor;
20  import io.netty5.util.concurrent.FastThreadLocal;
21  import io.netty5.util.concurrent.Promise;
22  import io.netty5.util.internal.ObjectPool;
23  import io.netty5.util.internal.ObjectPool.Handle;
24  import io.netty5.util.internal.PromiseNotificationUtil;
25  import io.netty5.util.internal.SilentDispose;
26  import io.netty5.util.internal.SystemPropertyUtil;
27  import io.netty5.util.internal.logging.InternalLogger;
28  import io.netty5.util.internal.logging.InternalLoggerFactory;
29  
30  import java.nio.ByteBuffer;
31  import java.util.Arrays;
32  
33  import static java.util.Objects.requireNonNull;
34  
35  /**
36   * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
37   * outbound write requests.
38   * <p>
39   * All methods must be called by a transport implementation from an I/O thread, except the following ones:
40   * <ul>
41   * <li>{@link #totalPendingWriteBytes()}</li>
42   * </ul>
43   * </p>
44   */
45  public final class ChannelOutboundBuffer {
46      // Assuming a 64-bit JVM:
47      //  - 16 bytes object header
48      //  - 6 reference fields
49      //  - 2 long fields
50      //  - 2 int fields
51      //  - 1 boolean field
52      //  - padding
53      static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
54              SystemPropertyUtil.getInt("io.netty5.transport.outboundBufferEntrySizeOverhead", 96);
55  
56      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
57  
58      private static final FastThreadLocal<BufferCache> NIO_BUFFERS = new FastThreadLocal<>() {
59          @Override
60          protected BufferCache initialValue() {
61              BufferCache cache = new BufferCache();
62              cache.buffers = new ByteBuffer[1024];
63              return cache;
64          }
65      };
66  
67      private final EventExecutor executor;
68  
69      // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
70      //
71      // The Entry that is the first in the linked-list structure that was flushed
72      private Entry flushedEntry;
73      // The Entry which is the first unflushed in the linked-list structure
74      private Entry unflushedEntry;
75      // The Entry which represents the tail of the buffer
76      private Entry tailEntry;
77      // The number of flushed entries that are not written yet
78      private int flushed;
79  
80      private int nioBufferCount;
81      private long nioBufferSize;
82  
83      private boolean inFail;
84  
85      // We use a volatile only as its single-writer, multiple reader
86      private volatile long totalPendingSize;
87  
88      @SuppressWarnings("UnusedDeclaration")
89      ChannelOutboundBuffer(EventExecutor executor) {
90          this.executor = executor;
91      }
92  
93      private void incrementPendingOutboundBytes(long size) {
94          if (size == 0) {
95              return;
96          }
97  
98          // Single-writer only so no atomic operation needed.
99          totalPendingSize += size;
100     }
101 
102     private void decrementPendingOutboundBytes(long size) {
103         if (size == 0) {
104             return;
105         }
106 
107         // Single-writer only so no atomic operation needed.
108         totalPendingSize -= size;
109     }
110 
111     /**
112      * Add given message to this {@link ChannelOutboundBuffer}. The given {@link Promise} will be notified once
113      * the message was written.
114      */
115     public void addMessage(Object msg, int size, Promise<Void> promise) {
116         assert executor.inEventLoop();
117         Entry entry = Entry.newInstance(msg, size, total(msg), promise);
118         if (tailEntry == null) {
119             flushedEntry = null;
120         } else {
121             Entry tail = tailEntry;
122             tail.next = entry;
123         }
124         tailEntry = entry;
125         if (unflushedEntry == null) {
126             unflushedEntry = entry;
127         }
128 
129         // increment pending bytes after adding message to the unflushed arrays.
130         // See https://github.com/netty/netty/issues/1619
131         incrementPendingOutboundBytes(entry.pendingSize);
132     }
133 
134     /**
135      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
136      * and so you will be able to handle them.
137      */
138     public void addFlush() {
139         assert executor.inEventLoop();
140 
141         // There is no need to process all entries if there was already a flush before and no new messages
142         // where added in the meantime.
143         //
144         // See https://github.com/netty/netty/issues/2577
145         Entry entry = unflushedEntry;
146         if (entry != null) {
147             if (flushedEntry == null) {
148                 // there is no flushedEntry yet, so start with the entry
149                 flushedEntry = entry;
150             }
151 
152             Entry prev = null;
153             do {
154                 if (!entry.promise.setUncancellable()) {
155                     // Was cancelled so make sure we free up memory, unlink and notify about the freed bytes
156                     int pending = entry.cancel();
157                     if (prev == null) {
158                         // It's the first entry, drop it
159                         flushedEntry = entry.next;
160                     } else {
161                         // Remove te entry from the linked list.
162                         prev.next = entry.next;
163                     }
164                     Entry next = entry.next;
165                     entry.recycle();
166                     entry = next;
167 
168                     decrementPendingOutboundBytes(pending);
169                 } else {
170                     flushed ++;
171                     prev = entry;
172                     entry = entry.next;
173                 }
174             } while (entry != null);
175 
176             // All flushed so reset unflushedEntry
177             unflushedEntry = null;
178         }
179     }
180 
181     private static long total(Object msg) {
182         if (msg instanceof Buffer) {
183             return ((Buffer) msg).readableBytes();
184         }
185         if (msg instanceof FileRegion) {
186             return ((FileRegion) msg).count();
187         }
188         return -1;
189     }
190 
191     /**
192      * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
193      */
194     public Object current() {
195         assert executor.inEventLoop();
196 
197         Entry entry = flushedEntry;
198         if (entry == null) {
199             return null;
200         }
201 
202         return entry.msg;
203     }
204 
205     /**
206      * Return the current message flush progress.
207      * @return {@code 0} if nothing was flushed before for the current message or there is no current message
208      */
209     public long currentProgress() {
210         assert executor.inEventLoop();
211 
212         Entry entry = flushedEntry;
213         if (entry == null) {
214             return 0;
215         }
216         return entry.progress;
217     }
218 
219     /**
220      * Notify the {@link Promise} of the current message about writing progress.
221      */
222     public void progress(long amount) {
223         assert executor.inEventLoop();
224 
225         Entry e = flushedEntry;
226         assert e != null;
227         e.progress += amount;
228     }
229 
230     /**
231      * Will remove the current message, mark its {@link Promise} as success and return {@code true}. If no
232      * flushed message exists at the time this method is called it will return {@code false} to signal that no more
233      * messages are ready to be handled.
234      */
235     public boolean remove() {
236         assert executor.inEventLoop();
237 
238         Entry e = flushedEntry;
239         if (e == null) {
240             clearNioBuffers();
241             return false;
242         }
243         Object msg = e.msg;
244 
245         Promise<Void> promise = e.promise;
246         int size = e.pendingSize;
247 
248         removeEntry(e);
249 
250         if (!e.cancelled) {
251             // only release message, notify and decrement if it was not canceled before.
252             SilentDispose.trySilentDispose(msg, logger);
253             safeSuccess(promise);
254             decrementPendingOutboundBytes(size);
255         }
256 
257         // recycle the entry
258         e.recycle();
259 
260         return true;
261     }
262 
263     /**
264      * Will remove the current message, mark its {@link Promise} as failure using the given {@link Throwable}
265      * and return {@code true}. If no   flushed message exists at the time this method is called it will return
266      * {@code false} to signal that no more messages are ready to be handled.
267      */
268     public boolean remove(Throwable cause) {
269         assert executor.inEventLoop();
270 
271         Entry e = flushedEntry;
272         if (e == null) {
273             clearNioBuffers();
274             return false;
275         }
276         Object msg = e.msg;
277 
278         Promise<Void> promise = e.promise;
279         int size = e.pendingSize;
280 
281         removeEntry(e);
282 
283         if (!e.cancelled) {
284             // only release message, fail and decrement if it was not canceled before.
285             SilentDispose.trySilentDispose(msg, logger);
286 
287             safeFail(promise, cause);
288             decrementPendingOutboundBytes(size);
289         }
290 
291         // recycle the entry
292         e.recycle();
293 
294         return true;
295     }
296 
297     private void removeEntry(Entry e) {
298         assert executor.inEventLoop();
299 
300         if (-- flushed == 0) {
301             // processed everything
302             flushedEntry = null;
303             if (e == tailEntry) {
304                 tailEntry = null;
305                 unflushedEntry = null;
306             }
307         } else {
308             flushedEntry = e.next;
309         }
310     }
311 
312     /**
313      * Removes the fully written entries and update the reader index of the partially written entry.
314      * This operation assumes all messages in this buffer are either {@link Buffer}s or {@link Buffer}s.
315      */
316     public void removeBytes(long writtenBytes) {
317         assert executor.inEventLoop();
318 
319         Object msg = current();
320         while (writtenBytes > 0 || hasZeroReadable(msg)) {
321             if (msg instanceof Buffer) {
322                 Buffer buf = (Buffer) msg;
323                 final int readableBytes = buf.readableBytes();
324                 if (readableBytes <= writtenBytes) {
325                     progress(readableBytes);
326                     writtenBytes -= readableBytes;
327                     remove();
328                 } else { // readableBytes > writtenBytes
329                     buf.readSplit(Math.toIntExact(writtenBytes)).close();
330                     progress(writtenBytes);
331                     break;
332                 }
333             } else {
334                 break; // Don't know how to process this message. Might be null.
335             }
336             msg = current();
337         }
338         clearNioBuffers();
339     }
340 
341     private static boolean hasZeroReadable(Object msg) {
342         if (msg instanceof Buffer) {
343             return ((Buffer) msg).readableBytes() == 0;
344         }
345         return false;
346     }
347 
348     // Clear all ByteBuffer from the array so these can be GC'ed.
349     // See https://github.com/netty/netty/issues/3837
350     private void clearNioBuffers() {
351         int count = nioBufferCount;
352         if (count > 0) {
353             nioBufferCount = 0;
354             Arrays.fill(NIO_BUFFERS.get().buffers, 0, count, null);
355         }
356     }
357 
358     /**
359      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link Buffer} only.
360      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
361      * array and the total number of readable bytes of the NIO buffers respectively.
362      * <p>
363      * Note that the returned array is reused and thus should not escape
364      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
365      * </p>
366      */
367     public ByteBuffer[] nioBuffers() {
368         assert executor.inEventLoop();
369 
370         return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
371     }
372 
373     /**
374      * Returns an array of direct NIO buffers if the currently pending messages are made of {@link Buffer} only.
375      * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
376      * array and the total number of readable bytes of the NIO buffers respectively.
377      * <p>
378      * Note that the returned array is reused and thus should not escape
379      * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
380      * </p>
381      * @param maxCount The maximum amount of buffers that will be added to the return value.
382      * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
383      *                 value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
384      *                 in the return value to ensure write progress is made.
385      */
386     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
387         assert executor.inEventLoop();
388 
389         assert maxCount > 0;
390         assert maxBytes > 0;
391         long nioBufferSize = 0;
392         int nioBufferCount = 0;
393         BufferCache cache = NIO_BUFFERS.get();
394         cache.dataSize = 0;
395         cache.bufferCount = 0;
396         ByteBuffer[] nioBuffers = cache.buffers;
397 
398         Entry entry = flushedEntry;
399         while (isFlushedEntry(entry) && entry.msg instanceof Buffer) {
400             if (!entry.cancelled) {
401                 Buffer buf = (Buffer) entry.msg;
402                 if (buf.readableBytes() > 0) {
403                     int count = buf.forEachReadable(0, (index, component) -> {
404                         ByteBuffer byteBuffer = component.readableBuffer();
405                         if (cache.bufferCount > 0 && cache.dataSize + byteBuffer.remaining() > maxBytes) {
406                             // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least
407                             // one entry we stop populate the ByteBuffer array. This is done for 2 reasons:
408                             // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one
409                             // writev(...) call and so will return 'EINVAL', which will raise an IOException.
410                             // On Linux it may work depending on the architecture and kernel but to be safe we also
411                             // enforce the limit here.
412                             // 2. There is no sense in putting more data in the array than is likely to be accepted
413                             // by the OS.
414                             //
415                             // See also:
416                             // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
417                             // - https://linux.die.net//man/2/writev
418                             return false;
419                         }
420                         cache.dataSize += byteBuffer.remaining();
421                         ByteBuffer[] buffers = cache.buffers;
422                         int bufferCount = cache.bufferCount;
423                         if (buffers.length == bufferCount && bufferCount < maxCount) {
424                             buffers = cache.buffers = expandNioBufferArray(buffers, bufferCount + 1, bufferCount);
425                         }
426                         buffers[cache.bufferCount] = byteBuffer;
427                         bufferCount++;
428                         cache.bufferCount = bufferCount;
429                         return bufferCount < maxCount;
430                     });
431                     if (count < 0) {
432                         break;
433                     }
434                 }
435             }
436             entry = entry.next;
437         }
438         this.nioBufferCount = nioBufferCount + cache.bufferCount;
439         this.nioBufferSize = nioBufferSize + cache.dataSize;
440 
441         return nioBuffers;
442     }
443 
444     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
445         int newCapacity = array.length;
446         do {
447             // double capacity until it is big enough
448             // See https://github.com/netty/netty/issues/1890
449             newCapacity <<= 1;
450 
451             if (newCapacity < 0) {
452                 throw new IllegalStateException();
453             }
454 
455         } while (neededSpace > newCapacity);
456 
457         ByteBuffer[] newArray = new ByteBuffer[newCapacity];
458         System.arraycopy(array, 0, newArray, 0, size);
459 
460         return newArray;
461     }
462 
463     /**
464      * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
465      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
466      * was called.
467      */
468     public int nioBufferCount() {
469         assert executor.inEventLoop();
470 
471         return nioBufferCount;
472     }
473 
474     /**
475      * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
476      * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
477      * was called.
478      */
479     public long nioBufferSize() {
480         assert executor.inEventLoop();
481 
482         return nioBufferSize;
483     }
484 
485     /**
486      * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
487      */
488     public int size() {
489         assert executor.inEventLoop();
490 
491         return flushed;
492     }
493 
494     /**
495      * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
496      * otherwise.
497      */
498     public boolean isEmpty() {
499         assert executor.inEventLoop();
500 
501         return flushed == 0;
502     }
503 
504     void failFlushedAndClose(Throwable failCause, Throwable closeCause) {
505         assert executor.inEventLoop();
506 
507         failFlushed(failCause);
508         close(closeCause);
509     }
510 
511     void failFlushed(Throwable cause) {
512         assert executor.inEventLoop();
513 
514         // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
515         // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
516         // indirectly (usually by closing the channel.)
517         //
518         // See https://github.com/netty/netty/issues/1501
519         if (inFail) {
520             return;
521         }
522 
523         try {
524             inFail = true;
525             for (;;) {
526                 if (!remove(cause)) {
527                     break;
528                 }
529             }
530         } finally {
531             inFail = false;
532         }
533     }
534 
535     private void close(final Throwable cause) {
536         assert executor.inEventLoop();
537 
538         if (inFail) {
539             executor.execute(() -> close(cause));
540             return;
541         }
542 
543         inFail = true;
544 
545         if (!isEmpty()) {
546             throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
547         }
548 
549         // Release all unflushed messages.
550         try {
551             Entry e = unflushedEntry;
552             while (e != null) {
553                 int size = e.pendingSize;
554 
555                 decrementPendingOutboundBytes(size);
556 
557                 if (!e.cancelled) {
558                     SilentDispose.dispose(e.msg, logger);
559                     safeFail(e.promise, cause);
560                 }
561                 e = e.recycleAndGetNext();
562             }
563         } finally {
564             inFail = false;
565         }
566         clearNioBuffers();
567     }
568 
569     private static void safeSuccess(Promise<Void> promise) {
570         PromiseNotificationUtil.trySuccess(promise, null, logger);
571     }
572 
573     private static void safeFail(Promise<Void> promise, Throwable cause) {
574         PromiseNotificationUtil.tryFailure(promise, cause, logger);
575     }
576 
577     public long totalPendingWriteBytes() {
578         return totalPendingSize;
579     }
580     /**
581      * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
582      * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
583      * returns {@code false} or there are no more flushed messages to process.
584      */
585     public <T extends Exception> void forEachFlushedMessage(MessageProcessor<T> processor) throws T {
586         assert executor.inEventLoop();
587 
588         requireNonNull(processor, "processor");
589 
590         Entry entry = flushedEntry;
591         if (entry == null) {
592             return;
593         }
594 
595         do {
596             if (!entry.cancelled) {
597                 if (!processor.processMessage(entry.msg)) {
598                     return;
599                 }
600             }
601             entry = entry.next;
602         } while (isFlushedEntry(entry));
603     }
604 
605     private boolean isFlushedEntry(Entry e) {
606         return e != null && e != unflushedEntry;
607     }
608 
609     public interface MessageProcessor<T extends Exception> {
610         /**
611          * Will be called for each flushed message until it either there are no more flushed messages or this
612          * method returns {@code false}.
613          */
614         boolean processMessage(Object msg) throws T;
615     }
616 
617     private static final class Entry {
618         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(Entry::new);
619 
620         private final Handle<Entry> handle;
621         Entry next;
622         Object msg;
623         ByteBuffer[] bufs;
624         ByteBuffer buf;
625         Promise<Void> promise;
626         long progress;
627         long total;
628         int pendingSize;
629         int count = -1;
630         boolean cancelled;
631 
632         private Entry(Handle<Entry> handle) {
633             this.handle = handle;
634         }
635 
636         static Entry newInstance(Object msg, int size, long total, Promise<Void> promise) {
637             Entry entry = RECYCLER.get();
638             entry.msg = msg;
639             entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
640             entry.total = total;
641             entry.promise = promise;
642             return entry;
643         }
644 
645         int cancel() {
646             if (!cancelled) {
647                 cancelled = true;
648                 int pSize = pendingSize;
649 
650                 // release message and replace with null
651                 SilentDispose.dispose(msg, logger);
652                 msg = null;
653 
654                 pendingSize = 0;
655                 total = 0;
656                 progress = 0;
657                 bufs = null;
658                 buf = null;
659                 return pSize;
660             }
661             return 0;
662         }
663 
664         void recycle() {
665             next = null;
666             bufs = null;
667             buf = null;
668             msg = null;
669             promise = null;
670             progress = 0;
671             total = 0;
672             pendingSize = 0;
673             count = -1;
674             cancelled = false;
675             handle.recycle(this);
676         }
677 
678         Entry recycleAndGetNext() {
679             Entry next = this.next;
680             recycle();
681             return next;
682         }
683     }
684 
685     /**
686      * Thread-local cache of {@link ByteBuffer} array, and processing meta-data.
687      */
688     private static final class BufferCache {
689         ByteBuffer[] buffers;
690         long dataSize;
691         int bufferCount;
692     }
693 }