1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.WritableByteChannel;
48 import java.util.Queue;
49 import java.util.concurrent.Executor;
50
51 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53 import static io.netty.channel.unix.FileDescriptor.pipe;
54 import static io.netty.util.internal.ObjectUtil.checkNotNull;
55 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56 import static io.netty.util.internal.StringUtil.className;
57
58 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60 private static final String EXPECTED_TYPES =
61 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64
65 private final Runnable flushTask = new Runnable() {
66 @Override
67 public void run() {
68
69
70 ((AbstractEpollUnsafe) unsafe()).flush0();
71 }
72 };
73
74
75 private volatile Queue<SpliceInTask> spliceQueue;
76 private FileDescriptor pipeIn;
77 private FileDescriptor pipeOut;
78
79 private WritableByteChannel byteChannel;
80
81 protected AbstractEpollStreamChannel(Channel parent, int fd) {
82 this(parent, new LinuxSocket(fd));
83 }
84
85 protected AbstractEpollStreamChannel(int fd) {
86 this(new LinuxSocket(fd));
87 }
88
89 AbstractEpollStreamChannel(LinuxSocket fd) {
90 this(fd, isSoErrorZero(fd));
91 }
92
93 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94
95 super(parent, fd, true, EpollIoOps.EPOLLRDHUP);
96 }
97
98 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
99
100 super(parent, fd, remote, EpollIoOps.EPOLLRDHUP);
101 }
102
103 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
104
105 super(null, fd, active, EpollIoOps.EPOLLRDHUP);
106 }
107
108 @Override
109 protected AbstractEpollUnsafe newUnsafe() {
110 return new EpollStreamUnsafe();
111 }
112
113 @Override
114 public ChannelMetadata metadata() {
115 return METADATA;
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
133 return spliceTo(ch, len, newPromise());
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
151 final ChannelPromise promise) {
152 if (ch.eventLoop() != eventLoop()) {
153 throw new IllegalArgumentException("EventLoops are not the same.");
154 }
155 checkPositiveOrZero(len, "len");
156 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
157 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
158 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
159 }
160 checkNotNull(promise, "promise");
161 if (!isOpen()) {
162 promise.tryFailure(new ClosedChannelException());
163 } else {
164 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
165 failSpliceIfClosed(promise);
166 }
167 return promise;
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
185 return spliceTo(ch, offset, len, newPromise());
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
203 final ChannelPromise promise) {
204 checkPositiveOrZero(len, "len");
205 checkPositiveOrZero(offset, "offset");
206 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
207 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
208 }
209 checkNotNull(promise, "promise");
210 if (!isOpen()) {
211 promise.tryFailure(new ClosedChannelException());
212 } else {
213 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
214 failSpliceIfClosed(promise);
215 }
216 return promise;
217 }
218
219 private void failSpliceIfClosed(ChannelPromise promise) {
220 if (!isOpen()) {
221
222
223 if (!promise.isDone()) {
224 final ClosedChannelException ex = new ClosedChannelException();
225 if (promise.tryFailure(ex)) {
226 eventLoop().execute(new Runnable() {
227 @Override
228 public void run() {
229
230 clearSpliceQueue(ex);
231 }
232 });
233 }
234 }
235 }
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
253 int readableBytes = buf.readableBytes();
254 if (readableBytes == 0) {
255 in.remove();
256 return 0;
257 }
258
259 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
260 return doWriteBytes(in, buf);
261 } else {
262 ByteBuffer[] nioBuffers = buf.nioBuffers();
263 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
264 config().getMaxBytesPerGatheringWrite());
265 }
266 }
267
268 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
269
270
271
272 if (attempted == written) {
273 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
274 config().setMaxBytesPerGatheringWrite(attempted << 1);
275 }
276 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
277 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
297 final long expectedWrittenBytes = array.size();
298 assert expectedWrittenBytes != 0;
299 final int cnt = array.count();
300 assert cnt != 0;
301
302 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
303 if (localWrittenBytes > 0) {
304 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
305 in.removeBytes(localWrittenBytes);
306 return 1;
307 }
308 return WRITE_STATUS_SNDBUF_FULL;
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 private int writeBytesMultiple(
330 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
331 long maxBytesPerGatheringWrite) throws IOException {
332 assert expectedWrittenBytes != 0;
333 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
334 expectedWrittenBytes = maxBytesPerGatheringWrite;
335 }
336
337 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
338 if (localWrittenBytes > 0) {
339 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
340 in.removeBytes(localWrittenBytes);
341 return 1;
342 }
343 return WRITE_STATUS_SNDBUF_FULL;
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
361 final long offset = region.transferred();
362 final long regionCount = region.count();
363 if (offset >= regionCount) {
364 in.remove();
365 return 0;
366 }
367
368 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
369 if (flushedAmount > 0) {
370 in.progress(flushedAmount);
371 if (region.transferred() >= regionCount) {
372 in.remove();
373 }
374 return 1;
375 } else if (flushedAmount == 0) {
376 validateFileRegion(region, offset);
377 }
378 return WRITE_STATUS_SNDBUF_FULL;
379 }
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
396 if (region.transferred() >= region.count()) {
397 in.remove();
398 return 0;
399 }
400
401 if (byteChannel == null) {
402 byteChannel = new EpollSocketWritableByteChannel();
403 }
404 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
405 if (flushedAmount > 0) {
406 in.progress(flushedAmount);
407 if (region.transferred() >= region.count()) {
408 in.remove();
409 }
410 return 1;
411 }
412 return WRITE_STATUS_SNDBUF_FULL;
413 }
414
415 @Override
416 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
417 int writeSpinCount = config().getWriteSpinCount();
418 do {
419 final int msgCount = in.size();
420
421 if (msgCount > 1 && in.current() instanceof ByteBuf) {
422 writeSpinCount -= doWriteMultiple(in);
423 } else if (msgCount == 0) {
424
425 clearFlag(Native.EPOLLOUT);
426
427 return;
428 } else {
429 writeSpinCount -= doWriteSingle(in);
430 }
431
432
433
434
435 } while (writeSpinCount > 0);
436
437 if (writeSpinCount == 0) {
438
439
440
441
442 clearFlag(Native.EPOLLOUT);
443
444
445 eventLoop().execute(flushTask);
446 } else {
447
448
449 setFlag(Native.EPOLLOUT);
450 }
451 }
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
468
469 Object msg = in.current();
470 if (msg instanceof ByteBuf) {
471 return writeBytes(in, (ByteBuf) msg);
472 } else if (msg instanceof DefaultFileRegion) {
473 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
474 } else if (msg instanceof FileRegion) {
475 return writeFileRegion(in, (FileRegion) msg);
476 } else if (msg instanceof SpliceOutTask) {
477 if (!((SpliceOutTask) msg).spliceOut()) {
478 return WRITE_STATUS_SNDBUF_FULL;
479 }
480 in.remove();
481 return 1;
482 } else {
483
484 throw new Error("Unexpected message type: " + className(msg));
485 }
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
503 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
504 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
505 array.maxBytes(maxBytesPerGatheringWrite);
506 in.forEachFlushedMessage(array);
507
508 if (array.count() >= 1) {
509
510 return writeBytesMultiple(in, array);
511 }
512
513 in.removeBytes(0);
514 return 0;
515 }
516
517 @Override
518 protected Object filterOutboundMessage(Object msg) {
519 if (msg instanceof ByteBuf) {
520 ByteBuf buf = (ByteBuf) msg;
521 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
522 }
523
524 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
525 return msg;
526 }
527
528 throw new UnsupportedOperationException(
529 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
530 }
531
532 @Override
533 protected final void doShutdownOutput() throws Exception {
534 socket.shutdown(false, true);
535 }
536
537 private void shutdownInput0(final ChannelPromise promise) {
538 try {
539 socket.shutdown(true, false);
540 promise.setSuccess();
541 } catch (Throwable cause) {
542 promise.setFailure(cause);
543 }
544 }
545
546 @Override
547 public boolean isOutputShutdown() {
548 return socket.isOutputShutdown();
549 }
550
551 @Override
552 public boolean isInputShutdown() {
553 return socket.isInputShutdown();
554 }
555
556 @Override
557 public boolean isShutdown() {
558 return socket.isShutdown();
559 }
560
561 @Override
562 public ChannelFuture shutdownOutput() {
563 return shutdownOutput(newPromise());
564 }
565
566 @Override
567 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
568 EventLoop loop = eventLoop();
569 if (loop.inEventLoop()) {
570 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
571 } else {
572 loop.execute(new Runnable() {
573 @Override
574 public void run() {
575 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
576 }
577 });
578 }
579
580 return promise;
581 }
582
583 @Override
584 public ChannelFuture shutdownInput() {
585 return shutdownInput(newPromise());
586 }
587
588 @Override
589 public ChannelFuture shutdownInput(final ChannelPromise promise) {
590 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
591 if (closeExecutor != null) {
592 closeExecutor.execute(new Runnable() {
593 @Override
594 public void run() {
595 shutdownInput0(promise);
596 }
597 });
598 } else {
599 EventLoop loop = eventLoop();
600 if (loop.inEventLoop()) {
601 shutdownInput0(promise);
602 } else {
603 loop.execute(new Runnable() {
604 @Override
605 public void run() {
606 shutdownInput0(promise);
607 }
608 });
609 }
610 }
611 return promise;
612 }
613
614 @Override
615 public ChannelFuture shutdown() {
616 return shutdown(newPromise());
617 }
618
619 @Override
620 public ChannelFuture shutdown(final ChannelPromise promise) {
621 ChannelFuture shutdownOutputFuture = shutdownOutput();
622 if (shutdownOutputFuture.isDone()) {
623 shutdownOutputDone(shutdownOutputFuture, promise);
624 } else {
625 shutdownOutputFuture.addListener(new ChannelFutureListener() {
626 @Override
627 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
628 shutdownOutputDone(shutdownOutputFuture, promise);
629 }
630 });
631 }
632 return promise;
633 }
634
635 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
636 ChannelFuture shutdownInputFuture = shutdownInput();
637 if (shutdownInputFuture.isDone()) {
638 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
639 } else {
640 shutdownInputFuture.addListener(new ChannelFutureListener() {
641 @Override
642 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
643 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
644 }
645 });
646 }
647 }
648
649 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
650 ChannelFuture shutdownInputFuture,
651 ChannelPromise promise) {
652 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
653 Throwable shutdownInputCause = shutdownInputFuture.cause();
654 if (shutdownOutputCause != null) {
655 if (shutdownInputCause != null) {
656 logger.debug("Exception suppressed because a previous exception occurred.",
657 shutdownInputCause);
658 }
659 promise.setFailure(shutdownOutputCause);
660 } else if (shutdownInputCause != null) {
661 promise.setFailure(shutdownInputCause);
662 } else {
663 promise.setSuccess();
664 }
665 }
666
667 @Override
668 protected void doClose() throws Exception {
669 try {
670
671 super.doClose();
672 } finally {
673 safeClosePipe(pipeIn);
674 safeClosePipe(pipeOut);
675 clearSpliceQueue(null);
676 }
677 }
678
679 private void clearSpliceQueue(ClosedChannelException exception) {
680 Queue<SpliceInTask> sQueue = spliceQueue;
681 if (sQueue == null) {
682 return;
683 }
684 for (;;) {
685 SpliceInTask task = sQueue.poll();
686 if (task == null) {
687 break;
688 }
689 if (exception == null) {
690 exception = new ClosedChannelException();
691 }
692 task.promise.tryFailure(exception);
693 }
694 }
695
696 private static void safeClosePipe(FileDescriptor fd) {
697 if (fd != null) {
698 try {
699 fd.close();
700 } catch (IOException e) {
701 logger.warn("Error while closing a pipe", e);
702 }
703 }
704 }
705
706 class EpollStreamUnsafe extends AbstractEpollUnsafe {
707
708 @Override
709 protected Executor prepareToClose() {
710 return super.prepareToClose();
711 }
712
713 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause,
714 boolean allDataRead, EpollRecvByteAllocatorHandle allocHandle) {
715 if (byteBuf != null) {
716 if (byteBuf.isReadable()) {
717 readPending = false;
718 pipeline.fireChannelRead(byteBuf);
719 } else {
720 byteBuf.release();
721 }
722 }
723 allocHandle.readComplete();
724 pipeline.fireChannelReadComplete();
725 pipeline.fireExceptionCaught(cause);
726
727
728
729 if (allDataRead || cause instanceof OutOfMemoryError || cause instanceof IOException) {
730 shutdownInput(true);
731 }
732 }
733
734 @Override
735 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
736 return new EpollRecvByteAllocatorStreamingHandle(handle);
737 }
738
739 @Override
740 void epollInReady() {
741 final ChannelConfig config = config();
742 if (shouldBreakEpollInReady(config)) {
743 clearEpollIn0();
744 return;
745 }
746 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
747 final ChannelPipeline pipeline = pipeline();
748 final ByteBufAllocator allocator = config.getAllocator();
749 allocHandle.reset(config);
750
751 ByteBuf byteBuf = null;
752 boolean allDataRead = false;
753 Queue<SpliceInTask> sQueue = null;
754 try {
755 do {
756 if (sQueue != null || (sQueue = spliceQueue) != null) {
757 SpliceInTask spliceTask = sQueue.peek();
758 if (spliceTask != null) {
759 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
760
761 if (allocHandle.isReceivedRdHup()) {
762 shutdownInput(false);
763 }
764 if (spliceInResult) {
765
766
767 if (isActive()) {
768 sQueue.remove();
769 }
770 continue;
771 } else {
772 break;
773 }
774 }
775 }
776
777
778
779 byteBuf = allocHandle.allocate(allocator);
780 allocHandle.lastBytesRead(doReadBytes(byteBuf));
781 if (allocHandle.lastBytesRead() <= 0) {
782
783 byteBuf.release();
784 byteBuf = null;
785 allDataRead = allocHandle.lastBytesRead() < 0;
786 if (allDataRead) {
787
788 readPending = false;
789 }
790 break;
791 }
792 allocHandle.incMessagesRead(1);
793 readPending = false;
794 pipeline.fireChannelRead(byteBuf);
795 byteBuf = null;
796
797 if (shouldBreakEpollInReady(config)) {
798
799
800
801
802
803
804
805
806
807
808
809 break;
810 }
811 } while (allocHandle.continueReading());
812
813 allocHandle.readComplete();
814 pipeline.fireChannelReadComplete();
815
816 if (allDataRead) {
817 shutdownInput(true);
818 }
819 } catch (Throwable t) {
820 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
821 } finally {
822 if (sQueue == null) {
823 if (shouldStopReading(config)) {
824 clearEpollIn();
825 }
826 } else {
827 if (!config.isAutoRead()) {
828 clearEpollIn();
829 }
830 }
831 }
832 }
833 }
834
835 private void addToSpliceQueue(final SpliceInTask task) {
836 Queue<SpliceInTask> sQueue = spliceQueue;
837 if (sQueue == null) {
838 synchronized (this) {
839 sQueue = spliceQueue;
840 if (sQueue == null) {
841 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
842 }
843 }
844 }
845 sQueue.add(task);
846 }
847
848 protected abstract class SpliceInTask {
849 final ChannelPromise promise;
850 int len;
851
852 protected SpliceInTask(int len, ChannelPromise promise) {
853 this.promise = promise;
854 this.len = len;
855 }
856
857 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
858
859 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
860
861 int length = Math.min(handle.guess(), len);
862 int splicedIn = 0;
863 for (;;) {
864
865 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
866 handle.lastBytesRead(localSplicedIn);
867 if (localSplicedIn == 0) {
868 break;
869 }
870 splicedIn += localSplicedIn;
871 length -= localSplicedIn;
872 }
873
874 return splicedIn;
875 }
876 }
877
878
879 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
880 private final AbstractEpollStreamChannel ch;
881
882 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
883 super(len, promise);
884 this.ch = ch;
885 }
886
887 @Override
888 public void operationComplete(ChannelFuture future) throws Exception {
889 if (!future.isSuccess()) {
890
891 promise.tryFailure(future.cause());
892 }
893 }
894
895 @Override
896 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
897 assert ch.eventLoop().inEventLoop();
898 if (len == 0) {
899
900 promise.trySuccess();
901 return true;
902 }
903 try {
904
905
906
907 FileDescriptor pipeOut = ch.pipeOut;
908 if (pipeOut == null) {
909
910 FileDescriptor[] pipe = pipe();
911 ch.pipeIn = pipe[0];
912 pipeOut = ch.pipeOut = pipe[1];
913 }
914
915 int splicedIn = spliceIn(pipeOut, handle);
916 if (splicedIn > 0) {
917
918 if (len != Integer.MAX_VALUE) {
919 len -= splicedIn;
920 }
921
922
923
924 final ChannelPromise splicePromise;
925 if (len == 0) {
926 splicePromise = promise;
927 } else {
928 splicePromise = ch.newPromise().addListener(this);
929 }
930
931 boolean autoRead = config().isAutoRead();
932
933
934
935 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
936 ch.unsafe().flush();
937 if (autoRead && !splicePromise.isDone()) {
938
939
940
941
942 config().setAutoRead(false);
943 }
944 }
945
946 return len == 0;
947 } catch (Throwable cause) {
948
949 promise.tryFailure(cause);
950 return true;
951 }
952 }
953 }
954
955 private final class SpliceOutTask {
956 private final AbstractEpollStreamChannel ch;
957 private final boolean autoRead;
958 private int len;
959
960 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
961 this.ch = ch;
962 this.len = len;
963 this.autoRead = autoRead;
964 }
965
966 public boolean spliceOut() throws Exception {
967 assert ch.eventLoop().inEventLoop();
968 try {
969 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
970 len -= splicedOut;
971 if (len == 0) {
972 if (autoRead) {
973
974 config().setAutoRead(true);
975 }
976 return true;
977 }
978 return false;
979 } catch (IOException e) {
980 if (autoRead) {
981
982 config().setAutoRead(true);
983 }
984 throw e;
985 }
986 }
987 }
988
989 private final class SpliceFdTask extends SpliceInTask {
990 private final FileDescriptor fd;
991 private final ChannelPromise promise;
992 private int offset;
993
994 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
995 super(len, promise);
996 this.fd = fd;
997 this.promise = promise;
998 this.offset = offset;
999 }
1000
1001 @Override
1002 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1003 assert eventLoop().inEventLoop();
1004 if (len == 0) {
1005
1006 promise.trySuccess();
1007 return true;
1008 }
1009
1010 try {
1011 FileDescriptor[] pipe = pipe();
1012 FileDescriptor pipeIn = pipe[0];
1013 FileDescriptor pipeOut = pipe[1];
1014 try {
1015 int splicedIn = spliceIn(pipeOut, handle);
1016 if (splicedIn > 0) {
1017
1018 if (len != Integer.MAX_VALUE) {
1019 len -= splicedIn;
1020 }
1021 do {
1022 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1023 offset += splicedOut;
1024 splicedIn -= splicedOut;
1025 } while (splicedIn > 0);
1026 if (len == 0) {
1027
1028 promise.trySuccess();
1029 return true;
1030 }
1031 }
1032 return false;
1033 } finally {
1034 safeClosePipe(pipeIn);
1035 safeClosePipe(pipeOut);
1036 }
1037 } catch (Throwable cause) {
1038
1039 promise.tryFailure(cause);
1040 return true;
1041 }
1042 }
1043 }
1044
1045 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1046 EpollSocketWritableByteChannel() {
1047 super(socket);
1048 assert fd == socket;
1049 }
1050
1051 @Override
1052 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1053 return socket.send(buf, pos, limit);
1054 }
1055
1056 @Override
1057 protected ByteBufAllocator alloc() {
1058 return AbstractEpollStreamChannel.this.alloc();
1059 }
1060 }
1061 }