1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.AbstractReferenceCountedByteBuf;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.socket.nio.NioSocketChannel;
23 import io.netty.util.Recycler;
24 import io.netty.util.Recycler.EnhancedHandle;
25 import io.netty.util.ReferenceCountUtil;
26 import io.netty.util.concurrent.FastThreadLocal;
27 import io.netty.util.internal.InternalThreadLocalMap;
28 import io.netty.util.internal.ObjectPool.Handle;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.PromiseNotificationUtil;
31 import io.netty.util.internal.SystemPropertyUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.nio.ByteBuffer;
36 import java.nio.channels.ClosedChannelException;
37 import java.util.Arrays;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
40
41 import static java.lang.Math.min;
42
43
44
45
46
47
48
49
50
51
52
53
54 public final class ChannelOutboundBuffer {
55
56
57
58
59
60
61
62 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
63 SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
64
65 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
66
67 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
68 @Override
69 protected ByteBuffer[] initialValue() throws Exception {
70 return new ByteBuffer[1024];
71 }
72 };
73
74 private final Channel channel;
75
76
77
78
79 private Entry flushedEntry;
80
81 private Entry unflushedEntry;
82
83 private Entry tailEntry;
84
85 private int flushed;
86
87 private int nioBufferCount;
88 private long nioBufferSize;
89
90 private boolean inFail;
91
92 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
93 AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
94
95 @SuppressWarnings("UnusedDeclaration")
96 private volatile long totalPendingSize;
97
98 private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
99 AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
100
101 @SuppressWarnings("UnusedDeclaration")
102 private volatile int unwritable;
103
104 private volatile Runnable fireChannelWritabilityChangedTask;
105
106 ChannelOutboundBuffer(AbstractChannel channel) {
107 this.channel = channel;
108 }
109
110
111
112
113
114 public void addMessage(Object msg, int size, ChannelPromise promise) {
115 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
116 if (tailEntry == null) {
117 flushedEntry = null;
118 } else {
119 Entry tail = tailEntry;
120 tail.next = entry;
121 }
122 tailEntry = entry;
123 if (unflushedEntry == null) {
124 unflushedEntry = entry;
125 }
126
127
128
129
130
131 if (msg instanceof AbstractReferenceCountedByteBuf) {
132 ((AbstractReferenceCountedByteBuf) msg).touch();
133 } else {
134 ReferenceCountUtil.touch(msg);
135 }
136
137
138
139 incrementPendingOutboundBytes(entry.pendingSize, false);
140 }
141
142
143
144
145
146 public void addFlush() {
147
148
149
150
151 Entry entry = unflushedEntry;
152 if (entry != null) {
153 if (flushedEntry == null) {
154
155 flushedEntry = entry;
156 }
157 do {
158 flushed ++;
159 if (!entry.promise.setUncancellable()) {
160
161 int pending = entry.cancel();
162 decrementPendingOutboundBytes(pending, false, true);
163 }
164 entry = entry.next;
165 } while (entry != null);
166
167
168 unflushedEntry = null;
169 }
170 }
171
172
173
174
175
176 void incrementPendingOutboundBytes(long size) {
177 incrementPendingOutboundBytes(size, true);
178 }
179
180 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
181 if (size == 0) {
182 return;
183 }
184
185 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
186 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
187 setUnwritable(invokeLater);
188 }
189 }
190
191
192
193
194
195 void decrementPendingOutboundBytes(long size) {
196 decrementPendingOutboundBytes(size, true, true);
197 }
198
199 private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
200 if (size == 0) {
201 return;
202 }
203
204 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
205 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
206 setWritable(invokeLater);
207 }
208 }
209
210 private static long total(Object msg) {
211 if (msg instanceof ByteBuf) {
212 return ((ByteBuf) msg).readableBytes();
213 }
214 if (msg instanceof FileRegion) {
215 return ((FileRegion) msg).count();
216 }
217 if (msg instanceof ByteBufHolder) {
218 return ((ByteBufHolder) msg).content().readableBytes();
219 }
220 return -1;
221 }
222
223
224
225
226 public Object current() {
227 Entry entry = flushedEntry;
228 if (entry == null) {
229 return null;
230 }
231
232 return entry.msg;
233 }
234
235
236
237
238
239 public long currentProgress() {
240 Entry entry = flushedEntry;
241 if (entry == null) {
242 return 0;
243 }
244 return entry.progress;
245 }
246
247
248
249
250 public void progress(long amount) {
251 Entry e = flushedEntry;
252 assert e != null;
253 ChannelPromise p = e.promise;
254 long progress = e.progress + amount;
255 e.progress = progress;
256 assert p != null;
257 final Class<?> promiseClass = p.getClass();
258
259 if (promiseClass == VoidChannelPromise.class || promiseClass == DefaultChannelPromise.class) {
260 return;
261 }
262
263 if (p instanceof DefaultChannelProgressivePromise) {
264 ((DefaultChannelProgressivePromise) p).tryProgress(progress, e.total);
265 } else if (p instanceof ChannelProgressivePromise) {
266 ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
267 }
268 }
269
270
271
272
273
274
275 public boolean remove() {
276 Entry e = flushedEntry;
277 if (e == null) {
278 clearNioBuffers();
279 return false;
280 }
281 Object msg = e.msg;
282
283 ChannelPromise promise = e.promise;
284 int size = e.pendingSize;
285
286 removeEntry(e);
287
288
289 if (!e.cancelled) {
290
291
292 if (msg instanceof AbstractReferenceCountedByteBuf) {
293 try {
294
295 ((AbstractReferenceCountedByteBuf) msg).release();
296 } catch (Throwable t) {
297 logger.warn("Failed to release a ByteBuf: {}", msg, t);
298 }
299 } else {
300 ReferenceCountUtil.safeRelease(msg);
301 }
302 safeSuccess(promise);
303 decrementPendingOutboundBytes(size, false, true);
304 }
305
306
307 e.unguardedRecycle();
308
309 return true;
310 }
311
312
313
314
315
316
317 public boolean remove(Throwable cause) {
318 return remove0(cause, true);
319 }
320
321 private boolean remove0(Throwable cause, boolean notifyWritability) {
322 Entry e = flushedEntry;
323 if (e == null) {
324 clearNioBuffers();
325 return false;
326 }
327 Object msg = e.msg;
328
329 ChannelPromise promise = e.promise;
330 int size = e.pendingSize;
331
332 removeEntry(e);
333
334 if (!e.cancelled) {
335
336 ReferenceCountUtil.safeRelease(msg);
337
338 safeFail(promise, cause);
339 decrementPendingOutboundBytes(size, false, notifyWritability);
340 }
341
342
343 e.unguardedRecycle();
344
345 return true;
346 }
347
348 private void removeEntry(Entry e) {
349 if (-- flushed == 0) {
350
351 flushedEntry = null;
352 if (e == tailEntry) {
353 tailEntry = null;
354 unflushedEntry = null;
355 }
356 } else {
357 flushedEntry = e.next;
358 }
359 }
360
361
362
363
364
365 public void removeBytes(long writtenBytes) {
366 for (;;) {
367 Object msg = current();
368 if (!(msg instanceof ByteBuf)) {
369 assert writtenBytes == 0;
370 break;
371 }
372
373 final ByteBuf buf = (ByteBuf) msg;
374 final int readerIndex = buf.readerIndex();
375 final int readableBytes = buf.writerIndex() - readerIndex;
376
377 if (readableBytes <= writtenBytes) {
378 if (writtenBytes != 0) {
379 progress(readableBytes);
380 writtenBytes -= readableBytes;
381 }
382 remove();
383 } else {
384 if (writtenBytes != 0) {
385 buf.readerIndex(readerIndex + (int) writtenBytes);
386 progress(writtenBytes);
387 }
388 break;
389 }
390 }
391 clearNioBuffers();
392 }
393
394
395
396 private void clearNioBuffers() {
397 int count = nioBufferCount;
398 if (count > 0) {
399 nioBufferCount = 0;
400 Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
401 }
402 }
403
404
405
406
407
408
409
410
411
412
413
414 public ByteBuffer[] nioBuffers() {
415 return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
416 }
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432 public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
433 assert maxCount > 0;
434 assert maxBytes > 0;
435 long nioBufferSize = 0;
436 int nioBufferCount = 0;
437 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
438 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
439 Entry entry = flushedEntry;
440 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
441 if (!entry.cancelled) {
442 ByteBuf buf = (ByteBuf) entry.msg;
443 final int readerIndex = buf.readerIndex();
444 final int readableBytes = buf.writerIndex() - readerIndex;
445
446 if (readableBytes > 0) {
447 if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
448
449
450
451
452
453
454
455
456
457
458
459 break;
460 }
461 nioBufferSize += readableBytes;
462 int count = entry.count;
463 if (count == -1) {
464
465 entry.count = count = buf.nioBufferCount();
466 }
467 int neededSpace = min(maxCount, nioBufferCount + count);
468 if (neededSpace > nioBuffers.length) {
469 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
470 NIO_BUFFERS.set(threadLocalMap, nioBuffers);
471 }
472 if (count == 1) {
473 ByteBuffer nioBuf = entry.buf;
474 if (nioBuf == null) {
475
476
477 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
478 }
479 nioBuffers[nioBufferCount++] = nioBuf;
480 } else {
481
482
483 nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
484 }
485 if (nioBufferCount >= maxCount) {
486 break;
487 }
488 }
489 }
490 entry = entry.next;
491 }
492 this.nioBufferCount = nioBufferCount;
493 this.nioBufferSize = nioBufferSize;
494
495 return nioBuffers;
496 }
497
498 private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
499 ByteBuffer[] nioBufs = entry.bufs;
500 if (nioBufs == null) {
501
502
503 entry.bufs = nioBufs = buf.nioBuffers();
504 }
505 for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
506 ByteBuffer nioBuf = nioBufs[i];
507 if (nioBuf == null) {
508 break;
509 } else if (!nioBuf.hasRemaining()) {
510 continue;
511 }
512 nioBuffers[nioBufferCount++] = nioBuf;
513 }
514 return nioBufferCount;
515 }
516
517 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
518 int newCapacity = array.length;
519 do {
520
521
522 newCapacity <<= 1;
523
524 if (newCapacity < 0) {
525 throw new IllegalStateException();
526 }
527
528 } while (neededSpace > newCapacity);
529
530 ByteBuffer[] newArray = new ByteBuffer[newCapacity];
531 System.arraycopy(array, 0, newArray, 0, size);
532
533 return newArray;
534 }
535
536
537
538
539
540
541 public int nioBufferCount() {
542 return nioBufferCount;
543 }
544
545
546
547
548
549
550 public long nioBufferSize() {
551 return nioBufferSize;
552 }
553
554
555
556
557
558
559
560 public boolean isWritable() {
561 return unwritable == 0;
562 }
563
564
565
566
567
568 public boolean getUserDefinedWritability(int index) {
569 return (unwritable & writabilityMask(index)) == 0;
570 }
571
572
573
574
575 public void setUserDefinedWritability(int index, boolean writable) {
576 if (writable) {
577 setUserDefinedWritability(index);
578 } else {
579 clearUserDefinedWritability(index);
580 }
581 }
582
583 private void setUserDefinedWritability(int index) {
584 final int mask = ~writabilityMask(index);
585 for (;;) {
586 final int oldValue = unwritable;
587 final int newValue = oldValue & mask;
588 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
589 if (oldValue != 0 && newValue == 0) {
590 fireChannelWritabilityChanged(true);
591 }
592 break;
593 }
594 }
595 }
596
597 private void clearUserDefinedWritability(int index) {
598 final int mask = writabilityMask(index);
599 for (;;) {
600 final int oldValue = unwritable;
601 final int newValue = oldValue | mask;
602 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
603 if (oldValue == 0 && newValue != 0) {
604 fireChannelWritabilityChanged(true);
605 }
606 break;
607 }
608 }
609 }
610
611 private static int writabilityMask(int index) {
612 if (index < 1 || index > 31) {
613 throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
614 }
615 return 1 << index;
616 }
617
618 private void setWritable(boolean invokeLater) {
619 for (;;) {
620 final int oldValue = unwritable;
621 final int newValue = oldValue & ~1;
622 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
623 if (oldValue != 0 && newValue == 0) {
624 fireChannelWritabilityChanged(invokeLater);
625 }
626 break;
627 }
628 }
629 }
630
631 private void setUnwritable(boolean invokeLater) {
632 for (;;) {
633 final int oldValue = unwritable;
634 final int newValue = oldValue | 1;
635 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
636 if (oldValue == 0) {
637 fireChannelWritabilityChanged(invokeLater);
638 }
639 break;
640 }
641 }
642 }
643
644 private void fireChannelWritabilityChanged(boolean invokeLater) {
645 final ChannelPipeline pipeline = channel.pipeline();
646 if (invokeLater) {
647 Runnable task = fireChannelWritabilityChangedTask;
648 if (task == null) {
649 fireChannelWritabilityChangedTask = task = new Runnable() {
650 @Override
651 public void run() {
652 pipeline.fireChannelWritabilityChanged();
653 }
654 };
655 }
656 channel.eventLoop().execute(task);
657 } else {
658 pipeline.fireChannelWritabilityChanged();
659 }
660 }
661
662
663
664
665 public int size() {
666 return flushed;
667 }
668
669
670
671
672
673 public boolean isEmpty() {
674 return flushed == 0;
675 }
676
677 void failFlushed(Throwable cause, boolean notify) {
678
679
680
681
682
683 if (inFail) {
684 return;
685 }
686
687 try {
688 inFail = true;
689 for (;;) {
690 if (!remove0(cause, notify)) {
691 break;
692 }
693 }
694 } finally {
695 inFail = false;
696 }
697 }
698
699 void close(final Throwable cause, final boolean allowChannelOpen) {
700 if (inFail) {
701 channel.eventLoop().execute(new Runnable() {
702 @Override
703 public void run() {
704 close(cause, allowChannelOpen);
705 }
706 });
707 return;
708 }
709
710 inFail = true;
711
712 if (!allowChannelOpen && channel.isOpen()) {
713 throw new IllegalStateException("close() must be invoked after the channel is closed.");
714 }
715
716 if (!isEmpty()) {
717 throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
718 }
719
720
721 try {
722 Entry e = unflushedEntry;
723 while (e != null) {
724
725 int size = e.pendingSize;
726 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
727
728 if (!e.cancelled) {
729 ReferenceCountUtil.safeRelease(e.msg);
730 safeFail(e.promise, cause);
731 }
732 e = e.unguardedRecycleAndGetNext();
733 }
734 } finally {
735 inFail = false;
736 }
737 clearNioBuffers();
738 }
739
740 void close(ClosedChannelException cause) {
741 close(cause, false);
742 }
743
744 private static void safeSuccess(ChannelPromise promise) {
745
746
747 PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
748 }
749
750 private static void safeFail(ChannelPromise promise, Throwable cause) {
751
752
753 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
754 }
755
756 @Deprecated
757 public void recycle() {
758
759 }
760
761 public long totalPendingWriteBytes() {
762 return totalPendingSize;
763 }
764
765
766
767
768
769 public long bytesBeforeUnwritable() {
770
771 long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
772
773
774
775 return bytes > 0 && isWritable() ? bytes : 0;
776 }
777
778
779
780
781
782 public long bytesBeforeWritable() {
783
784 long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark() + 1;
785
786
787
788 return bytes <= 0 || isWritable() ? 0 : bytes;
789 }
790
791
792
793
794
795
796 public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
797 ObjectUtil.checkNotNull(processor, "processor");
798
799 Entry entry = flushedEntry;
800 if (entry == null) {
801 return;
802 }
803
804 do {
805 if (!entry.cancelled) {
806 if (!processor.processMessage(entry.msg)) {
807 return;
808 }
809 }
810 entry = entry.next;
811 } while (isFlushedEntry(entry));
812 }
813
814 private boolean isFlushedEntry(Entry e) {
815 return e != null && e != unflushedEntry;
816 }
817
818 public interface MessageProcessor {
819
820
821
822
823 boolean processMessage(Object msg) throws Exception;
824 }
825
826 static final class Entry {
827 private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
828 @Override
829 protected Entry newObject(Handle<Entry> handle) {
830 return new Entry(handle);
831 }
832 };
833
834 private final EnhancedHandle<Entry> handle;
835 Entry next;
836 Object msg;
837 ByteBuffer[] bufs;
838 ByteBuffer buf;
839 ChannelPromise promise;
840 long progress;
841 long total;
842 int pendingSize;
843 int count = -1;
844 boolean cancelled;
845
846 private Entry(Handle<Entry> handle) {
847 this.handle = (EnhancedHandle<Entry>) handle;
848 }
849
850 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
851 Entry entry = RECYCLER.get();
852 entry.msg = msg;
853 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
854 entry.total = total;
855 entry.promise = promise;
856 return entry;
857 }
858
859 int cancel() {
860 if (!cancelled) {
861 cancelled = true;
862 int pSize = pendingSize;
863
864
865 ReferenceCountUtil.safeRelease(msg);
866 msg = Unpooled.EMPTY_BUFFER;
867
868 pendingSize = 0;
869 total = 0;
870 progress = 0;
871 bufs = null;
872 buf = null;
873 return pSize;
874 }
875 return 0;
876 }
877
878 void unguardedRecycle() {
879 next = null;
880 bufs = null;
881 buf = null;
882 msg = null;
883 promise = null;
884 progress = 0;
885 total = 0;
886 pendingSize = 0;
887 count = -1;
888 cancelled = false;
889 handle.unguardedRecycle(this);
890 }
891
892 Entry unguardedRecycleAndGetNext() {
893 Entry next = this.next;
894 unguardedRecycle();
895 return next;
896 }
897 }
898 }