1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.util.concurrent.EventExecutor;
20 import io.netty5.util.concurrent.FastThreadLocal;
21 import io.netty5.util.concurrent.Promise;
22 import io.netty5.util.internal.ObjectPool;
23 import io.netty5.util.internal.ObjectPool.Handle;
24 import io.netty5.util.internal.PromiseNotificationUtil;
25 import io.netty5.util.internal.SilentDispose;
26 import io.netty5.util.internal.SystemPropertyUtil;
27 import io.netty5.util.internal.logging.InternalLogger;
28 import io.netty5.util.internal.logging.InternalLoggerFactory;
29
30 import java.nio.ByteBuffer;
31 import java.util.Arrays;
32
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41
42
43
44
45 public final class ChannelOutboundBuffer {
46
47
48
49
50
51
52
53 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
54 SystemPropertyUtil.getInt("io.netty5.transport.outboundBufferEntrySizeOverhead", 96);
55
56 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
57
58 private static final FastThreadLocal<BufferCache> NIO_BUFFERS = new FastThreadLocal<>() {
59 @Override
60 protected BufferCache initialValue() {
61 BufferCache cache = new BufferCache();
62 cache.buffers = new ByteBuffer[1024];
63 return cache;
64 }
65 };
66
67 private final EventExecutor executor;
68
69
70
71
72 private Entry flushedEntry;
73
74 private Entry unflushedEntry;
75
76 private Entry tailEntry;
77
78 private int flushed;
79
80 private int nioBufferCount;
81 private long nioBufferSize;
82
83 private boolean inFail;
84
85
86 private volatile long totalPendingSize;
87
88 @SuppressWarnings("UnusedDeclaration")
89 ChannelOutboundBuffer(EventExecutor executor) {
90 this.executor = executor;
91 }
92
93 private void incrementPendingOutboundBytes(long size) {
94 if (size == 0) {
95 return;
96 }
97
98
99 totalPendingSize += size;
100 }
101
102 private void decrementPendingOutboundBytes(long size) {
103 if (size == 0) {
104 return;
105 }
106
107
108 totalPendingSize -= size;
109 }
110
111
112
113
114
115 public void addMessage(Object msg, int size, Promise<Void> promise) {
116 assert executor.inEventLoop();
117 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
118 if (tailEntry == null) {
119 flushedEntry = null;
120 } else {
121 Entry tail = tailEntry;
122 tail.next = entry;
123 }
124 tailEntry = entry;
125 if (unflushedEntry == null) {
126 unflushedEntry = entry;
127 }
128
129
130
131 incrementPendingOutboundBytes(entry.pendingSize);
132 }
133
134
135
136
137
138 public void addFlush() {
139 assert executor.inEventLoop();
140
141
142
143
144
145 Entry entry = unflushedEntry;
146 if (entry != null) {
147 if (flushedEntry == null) {
148
149 flushedEntry = entry;
150 }
151
152 Entry prev = null;
153 do {
154 if (!entry.promise.setUncancellable()) {
155
156 int pending = entry.cancel();
157 if (prev == null) {
158
159 flushedEntry = entry.next;
160 } else {
161
162 prev.next = entry.next;
163 }
164 Entry next = entry.next;
165 entry.recycle();
166 entry = next;
167
168 decrementPendingOutboundBytes(pending);
169 } else {
170 flushed ++;
171 prev = entry;
172 entry = entry.next;
173 }
174 } while (entry != null);
175
176
177 unflushedEntry = null;
178 }
179 }
180
181 private static long total(Object msg) {
182 if (msg instanceof Buffer) {
183 return ((Buffer) msg).readableBytes();
184 }
185 if (msg instanceof FileRegion) {
186 return ((FileRegion) msg).count();
187 }
188 return -1;
189 }
190
191
192
193
194 public Object current() {
195 assert executor.inEventLoop();
196
197 Entry entry = flushedEntry;
198 if (entry == null) {
199 return null;
200 }
201
202 return entry.msg;
203 }
204
205
206
207
208
209 public long currentProgress() {
210 assert executor.inEventLoop();
211
212 Entry entry = flushedEntry;
213 if (entry == null) {
214 return 0;
215 }
216 return entry.progress;
217 }
218
219
220
221
222 public void progress(long amount) {
223 assert executor.inEventLoop();
224
225 Entry e = flushedEntry;
226 assert e != null;
227 e.progress += amount;
228 }
229
230
231
232
233
234
235 public boolean remove() {
236 assert executor.inEventLoop();
237
238 Entry e = flushedEntry;
239 if (e == null) {
240 clearNioBuffers();
241 return false;
242 }
243 Object msg = e.msg;
244
245 Promise<Void> promise = e.promise;
246 int size = e.pendingSize;
247
248 removeEntry(e);
249
250 if (!e.cancelled) {
251
252 SilentDispose.trySilentDispose(msg, logger);
253 safeSuccess(promise);
254 decrementPendingOutboundBytes(size);
255 }
256
257
258 e.recycle();
259
260 return true;
261 }
262
263
264
265
266
267
268 public boolean remove(Throwable cause) {
269 assert executor.inEventLoop();
270
271 Entry e = flushedEntry;
272 if (e == null) {
273 clearNioBuffers();
274 return false;
275 }
276 Object msg = e.msg;
277
278 Promise<Void> promise = e.promise;
279 int size = e.pendingSize;
280
281 removeEntry(e);
282
283 if (!e.cancelled) {
284
285 SilentDispose.trySilentDispose(msg, logger);
286
287 safeFail(promise, cause);
288 decrementPendingOutboundBytes(size);
289 }
290
291
292 e.recycle();
293
294 return true;
295 }
296
297 private void removeEntry(Entry e) {
298 assert executor.inEventLoop();
299
300 if (-- flushed == 0) {
301
302 flushedEntry = null;
303 if (e == tailEntry) {
304 tailEntry = null;
305 unflushedEntry = null;
306 }
307 } else {
308 flushedEntry = e.next;
309 }
310 }
311
312
313
314
315
316 public void removeBytes(long writtenBytes) {
317 assert executor.inEventLoop();
318
319 Object msg = current();
320 while (writtenBytes > 0 || hasZeroReadable(msg)) {
321 if (msg instanceof Buffer) {
322 Buffer buf = (Buffer) msg;
323 final int readableBytes = buf.readableBytes();
324 if (readableBytes <= writtenBytes) {
325 progress(readableBytes);
326 writtenBytes -= readableBytes;
327 remove();
328 } else {
329 buf.readSplit(Math.toIntExact(writtenBytes)).close();
330 progress(writtenBytes);
331 break;
332 }
333 } else {
334 break;
335 }
336 msg = current();
337 }
338 clearNioBuffers();
339 }
340
341 private static boolean hasZeroReadable(Object msg) {
342 if (msg instanceof Buffer) {
343 return ((Buffer) msg).readableBytes() == 0;
344 }
345 return false;
346 }
347
348
349
350 private void clearNioBuffers() {
351 int count = nioBufferCount;
352 if (count > 0) {
353 nioBufferCount = 0;
354 Arrays.fill(NIO_BUFFERS.get().buffers, 0, count, null);
355 }
356 }
357
358
359
360
361
362
363
364
365
366
367 public ByteBuffer[] nioBuffers() {
368 assert executor.inEventLoop();
369
370 return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
371 }
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386 public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
387 assert executor.inEventLoop();
388
389 assert maxCount > 0;
390 assert maxBytes > 0;
391 long nioBufferSize = 0;
392 int nioBufferCount = 0;
393 BufferCache cache = NIO_BUFFERS.get();
394 cache.dataSize = 0;
395 cache.bufferCount = 0;
396 ByteBuffer[] nioBuffers = cache.buffers;
397
398 Entry entry = flushedEntry;
399 while (isFlushedEntry(entry) && entry.msg instanceof Buffer) {
400 if (!entry.cancelled) {
401 Buffer buf = (Buffer) entry.msg;
402 if (buf.readableBytes() > 0) {
403 int count = buf.forEachReadable(0, (index, component) -> {
404 ByteBuffer byteBuffer = component.readableBuffer();
405 if (cache.bufferCount > 0 && cache.dataSize + byteBuffer.remaining() > maxBytes) {
406
407
408
409
410
411
412
413
414
415
416
417
418 return false;
419 }
420 cache.dataSize += byteBuffer.remaining();
421 ByteBuffer[] buffers = cache.buffers;
422 int bufferCount = cache.bufferCount;
423 if (buffers.length == bufferCount && bufferCount < maxCount) {
424 buffers = cache.buffers = expandNioBufferArray(buffers, bufferCount + 1, bufferCount);
425 }
426 buffers[cache.bufferCount] = byteBuffer;
427 bufferCount++;
428 cache.bufferCount = bufferCount;
429 return bufferCount < maxCount;
430 });
431 if (count < 0) {
432 break;
433 }
434 }
435 }
436 entry = entry.next;
437 }
438 this.nioBufferCount = nioBufferCount + cache.bufferCount;
439 this.nioBufferSize = nioBufferSize + cache.dataSize;
440
441 return nioBuffers;
442 }
443
444 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
445 int newCapacity = array.length;
446 do {
447
448
449 newCapacity <<= 1;
450
451 if (newCapacity < 0) {
452 throw new IllegalStateException();
453 }
454
455 } while (neededSpace > newCapacity);
456
457 ByteBuffer[] newArray = new ByteBuffer[newCapacity];
458 System.arraycopy(array, 0, newArray, 0, size);
459
460 return newArray;
461 }
462
463
464
465
466
467
468 public int nioBufferCount() {
469 assert executor.inEventLoop();
470
471 return nioBufferCount;
472 }
473
474
475
476
477
478
479 public long nioBufferSize() {
480 assert executor.inEventLoop();
481
482 return nioBufferSize;
483 }
484
485
486
487
488 public int size() {
489 assert executor.inEventLoop();
490
491 return flushed;
492 }
493
494
495
496
497
498 public boolean isEmpty() {
499 assert executor.inEventLoop();
500
501 return flushed == 0;
502 }
503
504 void failFlushedAndClose(Throwable failCause, Throwable closeCause) {
505 assert executor.inEventLoop();
506
507 failFlushed(failCause);
508 close(closeCause);
509 }
510
511 void failFlushed(Throwable cause) {
512 assert executor.inEventLoop();
513
514
515
516
517
518
519 if (inFail) {
520 return;
521 }
522
523 try {
524 inFail = true;
525 for (;;) {
526 if (!remove(cause)) {
527 break;
528 }
529 }
530 } finally {
531 inFail = false;
532 }
533 }
534
535 private void close(final Throwable cause) {
536 assert executor.inEventLoop();
537
538 if (inFail) {
539 executor.execute(() -> close(cause));
540 return;
541 }
542
543 inFail = true;
544
545 if (!isEmpty()) {
546 throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
547 }
548
549
550 try {
551 Entry e = unflushedEntry;
552 while (e != null) {
553 int size = e.pendingSize;
554
555 decrementPendingOutboundBytes(size);
556
557 if (!e.cancelled) {
558 SilentDispose.dispose(e.msg, logger);
559 safeFail(e.promise, cause);
560 }
561 e = e.recycleAndGetNext();
562 }
563 } finally {
564 inFail = false;
565 }
566 clearNioBuffers();
567 }
568
569 private static void safeSuccess(Promise<Void> promise) {
570 PromiseNotificationUtil.trySuccess(promise, null, logger);
571 }
572
573 private static void safeFail(Promise<Void> promise, Throwable cause) {
574 PromiseNotificationUtil.tryFailure(promise, cause, logger);
575 }
576
577 public long totalPendingWriteBytes() {
578 return totalPendingSize;
579 }
580
581
582
583
584
585 public <T extends Exception> void forEachFlushedMessage(MessageProcessor<T> processor) throws T {
586 assert executor.inEventLoop();
587
588 requireNonNull(processor, "processor");
589
590 Entry entry = flushedEntry;
591 if (entry == null) {
592 return;
593 }
594
595 do {
596 if (!entry.cancelled) {
597 if (!processor.processMessage(entry.msg)) {
598 return;
599 }
600 }
601 entry = entry.next;
602 } while (isFlushedEntry(entry));
603 }
604
605 private boolean isFlushedEntry(Entry e) {
606 return e != null && e != unflushedEntry;
607 }
608
609 public interface MessageProcessor<T extends Exception> {
610
611
612
613
614 boolean processMessage(Object msg) throws T;
615 }
616
617 private static final class Entry {
618 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(Entry::new);
619
620 private final Handle<Entry> handle;
621 Entry next;
622 Object msg;
623 ByteBuffer[] bufs;
624 ByteBuffer buf;
625 Promise<Void> promise;
626 long progress;
627 long total;
628 int pendingSize;
629 int count = -1;
630 boolean cancelled;
631
632 private Entry(Handle<Entry> handle) {
633 this.handle = handle;
634 }
635
636 static Entry newInstance(Object msg, int size, long total, Promise<Void> promise) {
637 Entry entry = RECYCLER.get();
638 entry.msg = msg;
639 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
640 entry.total = total;
641 entry.promise = promise;
642 return entry;
643 }
644
645 int cancel() {
646 if (!cancelled) {
647 cancelled = true;
648 int pSize = pendingSize;
649
650
651 SilentDispose.dispose(msg, logger);
652 msg = null;
653
654 pendingSize = 0;
655 total = 0;
656 progress = 0;
657 bufs = null;
658 buf = null;
659 return pSize;
660 }
661 return 0;
662 }
663
664 void recycle() {
665 next = null;
666 bufs = null;
667 buf = null;
668 msg = null;
669 promise = null;
670 progress = 0;
671 total = 0;
672 pendingSize = 0;
673 count = -1;
674 cancelled = false;
675 handle.recycle(this);
676 }
677
678 Entry recycleAndGetNext() {
679 Entry next = this.next;
680 recycle();
681 return next;
682 }
683 }
684
685
686
687
688 private static final class BufferCache {
689 ByteBuffer[] buffers;
690 long dataSize;
691 int bufferCount;
692 }
693 }