1 /*
2 * Copyright 2013 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.channel;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.util.concurrent.EventExecutor;
20 import io.netty5.util.concurrent.FastThreadLocal;
21 import io.netty5.util.concurrent.Promise;
22 import io.netty5.util.internal.ObjectPool;
23 import io.netty5.util.internal.ObjectPool.Handle;
24 import io.netty5.util.internal.PromiseNotificationUtil;
25 import io.netty5.util.internal.SilentDispose;
26 import io.netty5.util.internal.SystemPropertyUtil;
27 import io.netty5.util.internal.logging.InternalLogger;
28 import io.netty5.util.internal.logging.InternalLoggerFactory;
29
30 import java.nio.ByteBuffer;
31 import java.util.Arrays;
32
33 import static java.util.Objects.requireNonNull;
34
35 /**
36 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
37 * outbound write requests.
38 * <p>
39 * All methods must be called by a transport implementation from an I/O thread, except the following ones:
40 * <ul>
41 * <li>{@link #totalPendingWriteBytes()}</li>
42 * </ul>
43 * </p>
44 */
45 public final class ChannelOutboundBuffer {
46 // Assuming a 64-bit JVM:
47 // - 16 bytes object header
48 // - 6 reference fields
49 // - 2 long fields
50 // - 2 int fields
51 // - 1 boolean field
52 // - padding
53 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
54 SystemPropertyUtil.getInt("io.netty5.transport.outboundBufferEntrySizeOverhead", 96);
55
56 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
57
58 private static final FastThreadLocal<BufferCache> NIO_BUFFERS = new FastThreadLocal<>() {
59 @Override
60 protected BufferCache initialValue() {
61 BufferCache cache = new BufferCache();
62 cache.buffers = new ByteBuffer[1024];
63 return cache;
64 }
65 };
66
67 private final EventExecutor executor;
68
69 // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
70 //
71 // The Entry that is the first in the linked-list structure that was flushed
72 private Entry flushedEntry;
73 // The Entry which is the first unflushed in the linked-list structure
74 private Entry unflushedEntry;
75 // The Entry which represents the tail of the buffer
76 private Entry tailEntry;
77 // The number of flushed entries that are not written yet
78 private int flushed;
79
80 private int nioBufferCount;
81 private long nioBufferSize;
82
83 private boolean inFail;
84
85 // We use a volatile only as its single-writer, multiple reader
86 private volatile long totalPendingSize;
87
88 @SuppressWarnings("UnusedDeclaration")
89 ChannelOutboundBuffer(EventExecutor executor) {
90 this.executor = executor;
91 }
92
93 private void incrementPendingOutboundBytes(long size) {
94 if (size == 0) {
95 return;
96 }
97
98 // Single-writer only so no atomic operation needed.
99 totalPendingSize += size;
100 }
101
102 private void decrementPendingOutboundBytes(long size) {
103 if (size == 0) {
104 return;
105 }
106
107 // Single-writer only so no atomic operation needed.
108 totalPendingSize -= size;
109 }
110
111 /**
112 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link Promise} will be notified once
113 * the message was written.
114 */
115 public void addMessage(Object msg, int size, Promise<Void> promise) {
116 assert executor.inEventLoop();
117 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
118 if (tailEntry == null) {
119 flushedEntry = null;
120 } else {
121 Entry tail = tailEntry;
122 tail.next = entry;
123 }
124 tailEntry = entry;
125 if (unflushedEntry == null) {
126 unflushedEntry = entry;
127 }
128
129 // increment pending bytes after adding message to the unflushed arrays.
130 // See https://github.com/netty/netty/issues/1619
131 incrementPendingOutboundBytes(entry.pendingSize);
132 }
133
134 /**
135 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
136 * and so you will be able to handle them.
137 */
138 public void addFlush() {
139 assert executor.inEventLoop();
140
141 // There is no need to process all entries if there was already a flush before and no new messages
142 // where added in the meantime.
143 //
144 // See https://github.com/netty/netty/issues/2577
145 Entry entry = unflushedEntry;
146 if (entry != null) {
147 if (flushedEntry == null) {
148 // there is no flushedEntry yet, so start with the entry
149 flushedEntry = entry;
150 }
151
152 Entry prev = null;
153 do {
154 if (!entry.promise.setUncancellable()) {
155 // Was cancelled so make sure we free up memory, unlink and notify about the freed bytes
156 int pending = entry.cancel();
157 if (prev == null) {
158 // It's the first entry, drop it
159 flushedEntry = entry.next;
160 } else {
161 // Remove te entry from the linked list.
162 prev.next = entry.next;
163 }
164 Entry next = entry.next;
165 entry.recycle();
166 entry = next;
167
168 decrementPendingOutboundBytes(pending);
169 } else {
170 flushed ++;
171 prev = entry;
172 entry = entry.next;
173 }
174 } while (entry != null);
175
176 // All flushed so reset unflushedEntry
177 unflushedEntry = null;
178 }
179 }
180
181 private static long total(Object msg) {
182 if (msg instanceof Buffer) {
183 return ((Buffer) msg).readableBytes();
184 }
185 if (msg instanceof FileRegion) {
186 return ((FileRegion) msg).count();
187 }
188 return -1;
189 }
190
191 /**
192 * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
193 */
194 public Object current() {
195 assert executor.inEventLoop();
196
197 Entry entry = flushedEntry;
198 if (entry == null) {
199 return null;
200 }
201
202 return entry.msg;
203 }
204
205 /**
206 * Return the current message flush progress.
207 * @return {@code 0} if nothing was flushed before for the current message or there is no current message
208 */
209 public long currentProgress() {
210 assert executor.inEventLoop();
211
212 Entry entry = flushedEntry;
213 if (entry == null) {
214 return 0;
215 }
216 return entry.progress;
217 }
218
219 /**
220 * Notify the {@link Promise} of the current message about writing progress.
221 */
222 public void progress(long amount) {
223 assert executor.inEventLoop();
224
225 Entry e = flushedEntry;
226 assert e != null;
227 e.progress += amount;
228 }
229
230 /**
231 * Will remove the current message, mark its {@link Promise} as success and return {@code true}. If no
232 * flushed message exists at the time this method is called it will return {@code false} to signal that no more
233 * messages are ready to be handled.
234 */
235 public boolean remove() {
236 assert executor.inEventLoop();
237
238 Entry e = flushedEntry;
239 if (e == null) {
240 clearNioBuffers();
241 return false;
242 }
243 Object msg = e.msg;
244
245 Promise<Void> promise = e.promise;
246 int size = e.pendingSize;
247
248 removeEntry(e);
249
250 if (!e.cancelled) {
251 // only release message, notify and decrement if it was not canceled before.
252 SilentDispose.trySilentDispose(msg, logger);
253 safeSuccess(promise);
254 decrementPendingOutboundBytes(size);
255 }
256
257 // recycle the entry
258 e.recycle();
259
260 return true;
261 }
262
263 /**
264 * Will remove the current message, mark its {@link Promise} as failure using the given {@link Throwable}
265 * and return {@code true}. If no flushed message exists at the time this method is called it will return
266 * {@code false} to signal that no more messages are ready to be handled.
267 */
268 public boolean remove(Throwable cause) {
269 assert executor.inEventLoop();
270
271 Entry e = flushedEntry;
272 if (e == null) {
273 clearNioBuffers();
274 return false;
275 }
276 Object msg = e.msg;
277
278 Promise<Void> promise = e.promise;
279 int size = e.pendingSize;
280
281 removeEntry(e);
282
283 if (!e.cancelled) {
284 // only release message, fail and decrement if it was not canceled before.
285 SilentDispose.trySilentDispose(msg, logger);
286
287 safeFail(promise, cause);
288 decrementPendingOutboundBytes(size);
289 }
290
291 // recycle the entry
292 e.recycle();
293
294 return true;
295 }
296
297 private void removeEntry(Entry e) {
298 assert executor.inEventLoop();
299
300 if (-- flushed == 0) {
301 // processed everything
302 flushedEntry = null;
303 if (e == tailEntry) {
304 tailEntry = null;
305 unflushedEntry = null;
306 }
307 } else {
308 flushedEntry = e.next;
309 }
310 }
311
312 /**
313 * Removes the fully written entries and update the reader index of the partially written entry.
314 * This operation assumes all messages in this buffer are either {@link Buffer}s or {@link Buffer}s.
315 */
316 public void removeBytes(long writtenBytes) {
317 assert executor.inEventLoop();
318
319 Object msg = current();
320 while (writtenBytes > 0 || hasZeroReadable(msg)) {
321 if (msg instanceof Buffer) {
322 Buffer buf = (Buffer) msg;
323 final int readableBytes = buf.readableBytes();
324 if (readableBytes <= writtenBytes) {
325 progress(readableBytes);
326 writtenBytes -= readableBytes;
327 remove();
328 } else { // readableBytes > writtenBytes
329 buf.readSplit(Math.toIntExact(writtenBytes)).close();
330 progress(writtenBytes);
331 break;
332 }
333 } else {
334 break; // Don't know how to process this message. Might be null.
335 }
336 msg = current();
337 }
338 clearNioBuffers();
339 }
340
341 private static boolean hasZeroReadable(Object msg) {
342 if (msg instanceof Buffer) {
343 return ((Buffer) msg).readableBytes() == 0;
344 }
345 return false;
346 }
347
348 // Clear all ByteBuffer from the array so these can be GC'ed.
349 // See https://github.com/netty/netty/issues/3837
350 private void clearNioBuffers() {
351 int count = nioBufferCount;
352 if (count > 0) {
353 nioBufferCount = 0;
354 Arrays.fill(NIO_BUFFERS.get().buffers, 0, count, null);
355 }
356 }
357
358 /**
359 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link Buffer} only.
360 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
361 * array and the total number of readable bytes of the NIO buffers respectively.
362 * <p>
363 * Note that the returned array is reused and thus should not escape
364 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
365 * </p>
366 */
367 public ByteBuffer[] nioBuffers() {
368 assert executor.inEventLoop();
369
370 return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
371 }
372
373 /**
374 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link Buffer} only.
375 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
376 * array and the total number of readable bytes of the NIO buffers respectively.
377 * <p>
378 * Note that the returned array is reused and thus should not escape
379 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
380 * </p>
381 * @param maxCount The maximum amount of buffers that will be added to the return value.
382 * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
383 * value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
384 * in the return value to ensure write progress is made.
385 */
386 public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
387 assert executor.inEventLoop();
388
389 assert maxCount > 0;
390 assert maxBytes > 0;
391 long nioBufferSize = 0;
392 int nioBufferCount = 0;
393 BufferCache cache = NIO_BUFFERS.get();
394 cache.dataSize = 0;
395 cache.bufferCount = 0;
396 ByteBuffer[] nioBuffers = cache.buffers;
397
398 Entry entry = flushedEntry;
399 while (isFlushedEntry(entry) && entry.msg instanceof Buffer) {
400 if (!entry.cancelled) {
401 Buffer buf = (Buffer) entry.msg;
402 if (buf.readableBytes() > 0) {
403 int count = buf.forEachReadable(0, (index, component) -> {
404 ByteBuffer byteBuffer = component.readableBuffer();
405 if (cache.bufferCount > 0 && cache.dataSize + byteBuffer.remaining() > maxBytes) {
406 // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least
407 // one entry we stop populate the ByteBuffer array. This is done for 2 reasons:
408 // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one
409 // writev(...) call and so will return 'EINVAL', which will raise an IOException.
410 // On Linux it may work depending on the architecture and kernel but to be safe we also
411 // enforce the limit here.
412 // 2. There is no sense in putting more data in the array than is likely to be accepted
413 // by the OS.
414 //
415 // See also:
416 // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
417 // - https://linux.die.net//man/2/writev
418 return false;
419 }
420 cache.dataSize += byteBuffer.remaining();
421 ByteBuffer[] buffers = cache.buffers;
422 int bufferCount = cache.bufferCount;
423 if (buffers.length == bufferCount && bufferCount < maxCount) {
424 buffers = cache.buffers = expandNioBufferArray(buffers, bufferCount + 1, bufferCount);
425 }
426 buffers[cache.bufferCount] = byteBuffer;
427 bufferCount++;
428 cache.bufferCount = bufferCount;
429 return bufferCount < maxCount;
430 });
431 if (count < 0) {
432 break;
433 }
434 }
435 }
436 entry = entry.next;
437 }
438 this.nioBufferCount = nioBufferCount + cache.bufferCount;
439 this.nioBufferSize = nioBufferSize + cache.dataSize;
440
441 return nioBuffers;
442 }
443
444 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
445 int newCapacity = array.length;
446 do {
447 // double capacity until it is big enough
448 // See https://github.com/netty/netty/issues/1890
449 newCapacity <<= 1;
450
451 if (newCapacity < 0) {
452 throw new IllegalStateException();
453 }
454
455 } while (neededSpace > newCapacity);
456
457 ByteBuffer[] newArray = new ByteBuffer[newCapacity];
458 System.arraycopy(array, 0, newArray, 0, size);
459
460 return newArray;
461 }
462
463 /**
464 * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
465 * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
466 * was called.
467 */
468 public int nioBufferCount() {
469 assert executor.inEventLoop();
470
471 return nioBufferCount;
472 }
473
474 /**
475 * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
476 * obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
477 * was called.
478 */
479 public long nioBufferSize() {
480 assert executor.inEventLoop();
481
482 return nioBufferSize;
483 }
484
485 /**
486 * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
487 */
488 public int size() {
489 assert executor.inEventLoop();
490
491 return flushed;
492 }
493
494 /**
495 * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
496 * otherwise.
497 */
498 public boolean isEmpty() {
499 assert executor.inEventLoop();
500
501 return flushed == 0;
502 }
503
504 void failFlushedAndClose(Throwable failCause, Throwable closeCause) {
505 assert executor.inEventLoop();
506
507 failFlushed(failCause);
508 close(closeCause);
509 }
510
511 void failFlushed(Throwable cause) {
512 assert executor.inEventLoop();
513
514 // Make sure that this method does not reenter. A listener added to the current promise can be notified by the
515 // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
516 // indirectly (usually by closing the channel.)
517 //
518 // See https://github.com/netty/netty/issues/1501
519 if (inFail) {
520 return;
521 }
522
523 try {
524 inFail = true;
525 for (;;) {
526 if (!remove(cause)) {
527 break;
528 }
529 }
530 } finally {
531 inFail = false;
532 }
533 }
534
535 private void close(final Throwable cause) {
536 assert executor.inEventLoop();
537
538 if (inFail) {
539 executor.execute(() -> close(cause));
540 return;
541 }
542
543 inFail = true;
544
545 if (!isEmpty()) {
546 throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
547 }
548
549 // Release all unflushed messages.
550 try {
551 Entry e = unflushedEntry;
552 while (e != null) {
553 int size = e.pendingSize;
554
555 decrementPendingOutboundBytes(size);
556
557 if (!e.cancelled) {
558 SilentDispose.dispose(e.msg, logger);
559 safeFail(e.promise, cause);
560 }
561 e = e.recycleAndGetNext();
562 }
563 } finally {
564 inFail = false;
565 }
566 clearNioBuffers();
567 }
568
569 private static void safeSuccess(Promise<Void> promise) {
570 PromiseNotificationUtil.trySuccess(promise, null, logger);
571 }
572
573 private static void safeFail(Promise<Void> promise, Throwable cause) {
574 PromiseNotificationUtil.tryFailure(promise, cause, logger);
575 }
576
577 public long totalPendingWriteBytes() {
578 return totalPendingSize;
579 }
580 /**
581 * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
582 * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
583 * returns {@code false} or there are no more flushed messages to process.
584 */
585 public <T extends Exception> void forEachFlushedMessage(MessageProcessor<T> processor) throws T {
586 assert executor.inEventLoop();
587
588 requireNonNull(processor, "processor");
589
590 Entry entry = flushedEntry;
591 if (entry == null) {
592 return;
593 }
594
595 do {
596 if (!entry.cancelled) {
597 if (!processor.processMessage(entry.msg)) {
598 return;
599 }
600 }
601 entry = entry.next;
602 } while (isFlushedEntry(entry));
603 }
604
605 private boolean isFlushedEntry(Entry e) {
606 return e != null && e != unflushedEntry;
607 }
608
609 public interface MessageProcessor<T extends Exception> {
610 /**
611 * Will be called for each flushed message until it either there are no more flushed messages or this
612 * method returns {@code false}.
613 */
614 boolean processMessage(Object msg) throws T;
615 }
616
617 private static final class Entry {
618 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(Entry::new);
619
620 private final Handle<Entry> handle;
621 Entry next;
622 Object msg;
623 ByteBuffer[] bufs;
624 ByteBuffer buf;
625 Promise<Void> promise;
626 long progress;
627 long total;
628 int pendingSize;
629 int count = -1;
630 boolean cancelled;
631
632 private Entry(Handle<Entry> handle) {
633 this.handle = handle;
634 }
635
636 static Entry newInstance(Object msg, int size, long total, Promise<Void> promise) {
637 Entry entry = RECYCLER.get();
638 entry.msg = msg;
639 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
640 entry.total = total;
641 entry.promise = promise;
642 return entry;
643 }
644
645 int cancel() {
646 if (!cancelled) {
647 cancelled = true;
648 int pSize = pendingSize;
649
650 // release message and replace with null
651 SilentDispose.dispose(msg, logger);
652 msg = null;
653
654 pendingSize = 0;
655 total = 0;
656 progress = 0;
657 bufs = null;
658 buf = null;
659 return pSize;
660 }
661 return 0;
662 }
663
664 void recycle() {
665 next = null;
666 bufs = null;
667 buf = null;
668 msg = null;
669 promise = null;
670 progress = 0;
671 total = 0;
672 pendingSize = 0;
673 count = -1;
674 cancelled = false;
675 handle.recycle(this);
676 }
677
678 Entry recycleAndGetNext() {
679 Entry next = this.next;
680 recycle();
681 return next;
682 }
683 }
684
685 /**
686 * Thread-local cache of {@link ByteBuffer} array, and processing meta-data.
687 */
688 private static final class BufferCache {
689 ByteBuffer[] buffers;
690 long dataSize;
691 int bufferCount;
692 }
693 }