1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.socket.nio.NioSocketChannel;
22 import io.netty.util.Recycler;
23 import io.netty.util.Recycler.Handle;
24 import io.netty.util.ReferenceCountUtil;
25 import io.netty.util.concurrent.FastThreadLocal;
26 import io.netty.util.internal.InternalThreadLocalMap;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.PromiseNotificationUtil;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.logging.InternalLogger;
31 import io.netty.util.internal.logging.InternalLoggerFactory;
32
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ClosedChannelException;
35 import java.util.Arrays;
36 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public final class ChannelOutboundBuffer {
52
53
54
55
56
57
58
59 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
60 SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
61
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
63
64 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
65 @Override
66 protected ByteBuffer[] initialValue() throws Exception {
67 return new ByteBuffer[1024];
68 }
69 };
70
71 private final Channel channel;
72
73
74
75
76 private Entry flushedEntry;
77
78 private Entry unflushedEntry;
79
80 private Entry tailEntry;
81
82 private int flushed;
83
84 private int nioBufferCount;
85 private long nioBufferSize;
86
87 private boolean inFail;
88
89 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
90 AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
91
92 @SuppressWarnings("UnusedDeclaration")
93 private volatile long totalPendingSize;
94
95 private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
96 AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
97
98 @SuppressWarnings("UnusedDeclaration")
99 private volatile int unwritable;
100
101 private volatile Runnable fireChannelWritabilityChangedTask;
102
103 ChannelOutboundBuffer(AbstractChannel channel) {
104 this.channel = channel;
105 }
106
107
108
109
110
111 public void addMessage(Object msg, int size, ChannelPromise promise) {
112 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
113 if (tailEntry == null) {
114 flushedEntry = null;
115 tailEntry = entry;
116 } else {
117 Entry tail = tailEntry;
118 tail.next = entry;
119 tailEntry = entry;
120 }
121 if (unflushedEntry == null) {
122 unflushedEntry = entry;
123 }
124
125
126
127 incrementPendingOutboundBytes(entry.pendingSize, false);
128 }
129
130
131
132
133
134 public void addFlush() {
135
136
137
138
139 Entry entry = unflushedEntry;
140 if (entry != null) {
141 if (flushedEntry == null) {
142
143 flushedEntry = entry;
144 }
145 do {
146 flushed ++;
147 if (!entry.promise.setUncancellable()) {
148
149 int pending = entry.cancel();
150 decrementPendingOutboundBytes(pending, false, true);
151 }
152 entry = entry.next;
153 } while (entry != null);
154
155
156 unflushedEntry = null;
157 }
158 }
159
160
161
162
163
164 void incrementPendingOutboundBytes(long size) {
165 incrementPendingOutboundBytes(size, true);
166 }
167
168 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
169 if (size == 0) {
170 return;
171 }
172
173 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
174 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
175 setUnwritable(invokeLater);
176 }
177 }
178
179
180
181
182
183 void decrementPendingOutboundBytes(long size) {
184 decrementPendingOutboundBytes(size, true, true);
185 }
186
187 private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
188 if (size == 0) {
189 return;
190 }
191
192 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
193 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
194 setWritable(invokeLater);
195 }
196 }
197
198 private static long total(Object msg) {
199 if (msg instanceof ByteBuf) {
200 return ((ByteBuf) msg).readableBytes();
201 }
202 if (msg instanceof FileRegion) {
203 return ((FileRegion) msg).count();
204 }
205 if (msg instanceof ByteBufHolder) {
206 return ((ByteBufHolder) msg).content().readableBytes();
207 }
208 return -1;
209 }
210
211
212
213
214 public Object current() {
215 Entry entry = flushedEntry;
216 if (entry == null) {
217 return null;
218 }
219
220 return entry.msg;
221 }
222
223
224
225
226 public void progress(long amount) {
227 Entry e = flushedEntry;
228 assert e != null;
229 ChannelPromise p = e.promise;
230 if (p instanceof ChannelProgressivePromise) {
231 long progress = e.progress + amount;
232 e.progress = progress;
233 ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
234 }
235 }
236
237
238
239
240
241
242 public boolean remove() {
243 Entry e = flushedEntry;
244 if (e == null) {
245 clearNioBuffers();
246 return false;
247 }
248 Object msg = e.msg;
249
250 ChannelPromise promise = e.promise;
251 int size = e.pendingSize;
252
253 removeEntry(e);
254
255 if (!e.cancelled) {
256
257 ReferenceCountUtil.safeRelease(msg);
258 safeSuccess(promise);
259 decrementPendingOutboundBytes(size, false, true);
260 }
261
262
263 e.recycle();
264
265 return true;
266 }
267
268
269
270
271
272
273 public boolean remove(Throwable cause) {
274 return remove0(cause, true);
275 }
276
277 private boolean remove0(Throwable cause, boolean notifyWritability) {
278 Entry e = flushedEntry;
279 if (e == null) {
280 clearNioBuffers();
281 return false;
282 }
283 Object msg = e.msg;
284
285 ChannelPromise promise = e.promise;
286 int size = e.pendingSize;
287
288 removeEntry(e);
289
290 if (!e.cancelled) {
291
292 ReferenceCountUtil.safeRelease(msg);
293
294 safeFail(promise, cause);
295 decrementPendingOutboundBytes(size, false, notifyWritability);
296 }
297
298
299 e.recycle();
300
301 return true;
302 }
303
304 private void removeEntry(Entry e) {
305 if (-- flushed == 0) {
306
307 flushedEntry = null;
308 if (e == tailEntry) {
309 tailEntry = null;
310 unflushedEntry = null;
311 }
312 } else {
313 flushedEntry = e.next;
314 }
315 }
316
317
318
319
320
321 public void removeBytes(long writtenBytes) {
322 for (;;) {
323 Object msg = current();
324 if (!(msg instanceof ByteBuf)) {
325 assert writtenBytes == 0;
326 break;
327 }
328
329 final ByteBuf buf = (ByteBuf) msg;
330 final int readerIndex = buf.readerIndex();
331 final int readableBytes = buf.writerIndex() - readerIndex;
332
333 if (readableBytes <= writtenBytes) {
334 if (writtenBytes != 0) {
335 progress(readableBytes);
336 writtenBytes -= readableBytes;
337 }
338 remove();
339 } else {
340 if (writtenBytes != 0) {
341 buf.readerIndex(readerIndex + (int) writtenBytes);
342 progress(writtenBytes);
343 }
344 break;
345 }
346 }
347 clearNioBuffers();
348 }
349
350
351
352 private void clearNioBuffers() {
353 int count = nioBufferCount;
354 if (count > 0) {
355 nioBufferCount = 0;
356 Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
357 }
358 }
359
360
361
362
363
364
365
366
367
368
369
370 public ByteBuffer[] nioBuffers() {
371 long nioBufferSize = 0;
372 int nioBufferCount = 0;
373 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
374 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
375 Entry entry = flushedEntry;
376 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
377 if (!entry.cancelled) {
378 ByteBuf buf = (ByteBuf) entry.msg;
379 final int readerIndex = buf.readerIndex();
380 final int readableBytes = buf.writerIndex() - readerIndex;
381
382 if (readableBytes > 0) {
383 if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {
384
385
386
387
388
389
390
391
392
393
394 break;
395 }
396 nioBufferSize += readableBytes;
397 int count = entry.count;
398 if (count == -1) {
399
400 entry.count = count = buf.nioBufferCount();
401 }
402 int neededSpace = nioBufferCount + count;
403 if (neededSpace > nioBuffers.length) {
404 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
405 NIO_BUFFERS.set(threadLocalMap, nioBuffers);
406 }
407 if (count == 1) {
408 ByteBuffer nioBuf = entry.buf;
409 if (nioBuf == null) {
410
411
412 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
413 }
414 nioBuffers[nioBufferCount ++] = nioBuf;
415 } else {
416 ByteBuffer[] nioBufs = entry.bufs;
417 if (nioBufs == null) {
418
419
420 entry.bufs = nioBufs = buf.nioBuffers();
421 }
422 nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
423 }
424 }
425 }
426 entry = entry.next;
427 }
428 this.nioBufferCount = nioBufferCount;
429 this.nioBufferSize = nioBufferSize;
430
431 return nioBuffers;
432 }
433
434 private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
435 for (ByteBuffer nioBuf: nioBufs) {
436 if (nioBuf == null) {
437 break;
438 }
439 nioBuffers[nioBufferCount ++] = nioBuf;
440 }
441 return nioBufferCount;
442 }
443
444 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
445 int newCapacity = array.length;
446 do {
447
448
449 newCapacity <<= 1;
450
451 if (newCapacity < 0) {
452 throw new IllegalStateException();
453 }
454
455 } while (neededSpace > newCapacity);
456
457 ByteBuffer[] newArray = new ByteBuffer[newCapacity];
458 System.arraycopy(array, 0, newArray, 0, size);
459
460 return newArray;
461 }
462
463
464
465
466
467
468 public int nioBufferCount() {
469 return nioBufferCount;
470 }
471
472
473
474
475
476
477 public long nioBufferSize() {
478 return nioBufferSize;
479 }
480
481
482
483
484
485
486
487 public boolean isWritable() {
488 return unwritable == 0;
489 }
490
491
492
493
494
495 public boolean getUserDefinedWritability(int index) {
496 return (unwritable & writabilityMask(index)) == 0;
497 }
498
499
500
501
502 public void setUserDefinedWritability(int index, boolean writable) {
503 if (writable) {
504 setUserDefinedWritability(index);
505 } else {
506 clearUserDefinedWritability(index);
507 }
508 }
509
510 private void setUserDefinedWritability(int index) {
511 final int mask = ~writabilityMask(index);
512 for (;;) {
513 final int oldValue = unwritable;
514 final int newValue = oldValue & mask;
515 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
516 if (oldValue != 0 && newValue == 0) {
517 fireChannelWritabilityChanged(true);
518 }
519 break;
520 }
521 }
522 }
523
524 private void clearUserDefinedWritability(int index) {
525 final int mask = writabilityMask(index);
526 for (;;) {
527 final int oldValue = unwritable;
528 final int newValue = oldValue | mask;
529 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
530 if (oldValue == 0 && newValue != 0) {
531 fireChannelWritabilityChanged(true);
532 }
533 break;
534 }
535 }
536 }
537
538 private static int writabilityMask(int index) {
539 if (index < 1 || index > 31) {
540 throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
541 }
542 return 1 << index;
543 }
544
545 private void setWritable(boolean invokeLater) {
546 for (;;) {
547 final int oldValue = unwritable;
548 final int newValue = oldValue & ~1;
549 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
550 if (oldValue != 0 && newValue == 0) {
551 fireChannelWritabilityChanged(invokeLater);
552 }
553 break;
554 }
555 }
556 }
557
558 private void setUnwritable(boolean invokeLater) {
559 for (;;) {
560 final int oldValue = unwritable;
561 final int newValue = oldValue | 1;
562 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
563 if (oldValue == 0 && newValue != 0) {
564 fireChannelWritabilityChanged(invokeLater);
565 }
566 break;
567 }
568 }
569 }
570
571 private void fireChannelWritabilityChanged(boolean invokeLater) {
572 final ChannelPipeline pipeline = channel.pipeline();
573 if (invokeLater) {
574 Runnable task = fireChannelWritabilityChangedTask;
575 if (task == null) {
576 fireChannelWritabilityChangedTask = task = new Runnable() {
577 @Override
578 public void run() {
579 pipeline.fireChannelWritabilityChanged();
580 }
581 };
582 }
583 channel.eventLoop().execute(task);
584 } else {
585 pipeline.fireChannelWritabilityChanged();
586 }
587 }
588
589
590
591
592 public int size() {
593 return flushed;
594 }
595
596
597
598
599
600 public boolean isEmpty() {
601 return flushed == 0;
602 }
603
604 void failFlushed(Throwable cause, boolean notify) {
605
606
607
608
609
610 if (inFail) {
611 return;
612 }
613
614 try {
615 inFail = true;
616 for (;;) {
617 if (!remove0(cause, notify)) {
618 break;
619 }
620 }
621 } finally {
622 inFail = false;
623 }
624 }
625
626 void close(final Throwable cause, final boolean allowChannelOpen) {
627 if (inFail) {
628 channel.eventLoop().execute(new Runnable() {
629 @Override
630 public void run() {
631 close(cause, allowChannelOpen);
632 }
633 });
634 return;
635 }
636
637 inFail = true;
638
639 if (!allowChannelOpen && channel.isOpen()) {
640 throw new IllegalStateException("close() must be invoked after the channel is closed.");
641 }
642
643 if (!isEmpty()) {
644 throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
645 }
646
647
648 try {
649 Entry e = unflushedEntry;
650 while (e != null) {
651
652 int size = e.pendingSize;
653 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
654
655 if (!e.cancelled) {
656 ReferenceCountUtil.safeRelease(e.msg);
657 safeFail(e.promise, cause);
658 }
659 e = e.recycleAndGetNext();
660 }
661 } finally {
662 inFail = false;
663 }
664 clearNioBuffers();
665 }
666
667 void close(ClosedChannelException cause) {
668 close(cause, false);
669 }
670
671 private static void safeSuccess(ChannelPromise promise) {
672
673
674 PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
675 }
676
677 private static void safeFail(ChannelPromise promise, Throwable cause) {
678
679
680 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
681 }
682
683 @Deprecated
684 public void recycle() {
685
686 }
687
688 public long totalPendingWriteBytes() {
689 return totalPendingSize;
690 }
691
692
693
694
695
696 public long bytesBeforeUnwritable() {
697 long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
698
699
700
701 if (bytes > 0) {
702 return isWritable() ? bytes : 0;
703 }
704 return 0;
705 }
706
707
708
709
710
711 public long bytesBeforeWritable() {
712 long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
713
714
715
716 if (bytes > 0) {
717 return isWritable() ? 0 : bytes;
718 }
719 return 0;
720 }
721
722
723
724
725
726
727 public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
728 if (processor == null) {
729 throw new NullPointerException("processor");
730 }
731
732 Entry entry = flushedEntry;
733 if (entry == null) {
734 return;
735 }
736
737 do {
738 if (!entry.cancelled) {
739 if (!processor.processMessage(entry.msg)) {
740 return;
741 }
742 }
743 entry = entry.next;
744 } while (isFlushedEntry(entry));
745 }
746
747 private boolean isFlushedEntry(Entry e) {
748 return e != null && e != unflushedEntry;
749 }
750
751 public interface MessageProcessor {
752
753
754
755
756 boolean processMessage(Object msg) throws Exception;
757 }
758
759 static final class Entry {
760 private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
761 @Override
762 protected Entry newObject(Handle handle) {
763 return new Entry(handle);
764 }
765 };
766
767 private final Handle handle;
768 Entry next;
769 Object msg;
770 ByteBuffer[] bufs;
771 ByteBuffer buf;
772 ChannelPromise promise;
773 long progress;
774 long total;
775 int pendingSize;
776 int count = -1;
777 boolean cancelled;
778
779 private Entry(Handle handle) {
780 this.handle = handle;
781 }
782
783 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
784 Entry entry = RECYCLER.get();
785 entry.msg = msg;
786 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
787 entry.total = total;
788 entry.promise = promise;
789 return entry;
790 }
791
792 int cancel() {
793 if (!cancelled) {
794 cancelled = true;
795 int pSize = pendingSize;
796
797
798 ReferenceCountUtil.safeRelease(msg);
799 msg = Unpooled.EMPTY_BUFFER;
800
801 pendingSize = 0;
802 total = 0;
803 progress = 0;
804 bufs = null;
805 buf = null;
806 return pSize;
807 }
808 return 0;
809 }
810
811 void recycle() {
812 next = null;
813 bufs = null;
814 buf = null;
815 msg = null;
816 promise = null;
817 progress = 0;
818 total = 0;
819 pendingSize = 0;
820 count = -1;
821 cancelled = false;
822 RECYCLER.recycle(this, handle);
823 }
824
825 Entry recycleAndGetNext() {
826 Entry next = this.next;
827 recycle();
828 return next;
829 }
830 }
831 }