1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.WritableByteChannel;
48 import java.util.Queue;
49 import java.util.concurrent.Executor;
50
51 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53 import static io.netty.channel.unix.FileDescriptor.pipe;
54 import static io.netty.util.internal.ObjectUtil.checkNotNull;
55 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56
57 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
58 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
59 private static final String EXPECTED_TYPES =
60 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
61 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
63
64 private final Runnable flushTask = new Runnable() {
65 @Override
66 public void run() {
67
68
69 ((AbstractEpollUnsafe) unsafe()).flush0();
70 }
71 };
72
73
74 private volatile Queue<SpliceInTask> spliceQueue;
75 private FileDescriptor pipeIn;
76 private FileDescriptor pipeOut;
77
78 private WritableByteChannel byteChannel;
79
80 protected AbstractEpollStreamChannel(Channel parent, int fd) {
81 this(parent, new LinuxSocket(fd));
82 }
83
84 protected AbstractEpollStreamChannel(int fd) {
85 this(new LinuxSocket(fd));
86 }
87
88 AbstractEpollStreamChannel(LinuxSocket fd) {
89 this(fd, isSoErrorZero(fd));
90 }
91
92 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
93
94 super(parent, fd, true, EpollIoOps.EPOLLRDHUP);
95 }
96
97 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
98
99 super(parent, fd, remote, EpollIoOps.EPOLLRDHUP);
100 }
101
102 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
103
104 super(null, fd, active, EpollIoOps.EPOLLRDHUP);
105 }
106
107 @Override
108 protected AbstractEpollUnsafe newUnsafe() {
109 return new EpollStreamUnsafe();
110 }
111
112 @Override
113 public ChannelMetadata metadata() {
114 return METADATA;
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
132 return spliceTo(ch, len, newPromise());
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
150 final ChannelPromise promise) {
151 if (ch.eventLoop() != eventLoop()) {
152 throw new IllegalArgumentException("EventLoops are not the same.");
153 }
154 checkPositiveOrZero(len, "len");
155 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
156 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
157 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
158 }
159 checkNotNull(promise, "promise");
160 if (!isOpen()) {
161 promise.tryFailure(new ClosedChannelException());
162 } else {
163 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
164 failSpliceIfClosed(promise);
165 }
166 return promise;
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
184 return spliceTo(ch, offset, len, newPromise());
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
202 final ChannelPromise promise) {
203 checkPositiveOrZero(len, "len");
204 checkPositiveOrZero(offset, "offset");
205 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
206 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
207 }
208 checkNotNull(promise, "promise");
209 if (!isOpen()) {
210 promise.tryFailure(new ClosedChannelException());
211 } else {
212 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
213 failSpliceIfClosed(promise);
214 }
215 return promise;
216 }
217
218 private void failSpliceIfClosed(ChannelPromise promise) {
219 if (!isOpen()) {
220
221
222 if (!promise.isDone()) {
223 final ClosedChannelException ex = new ClosedChannelException();
224 if (promise.tryFailure(ex)) {
225 eventLoop().execute(new Runnable() {
226 @Override
227 public void run() {
228
229 clearSpliceQueue(ex);
230 }
231 });
232 }
233 }
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
252 int readableBytes = buf.readableBytes();
253 if (readableBytes == 0) {
254 in.remove();
255 return 0;
256 }
257
258 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
259 return doWriteBytes(in, buf);
260 } else {
261 ByteBuffer[] nioBuffers = buf.nioBuffers();
262 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
263 config().getMaxBytesPerGatheringWrite());
264 }
265 }
266
267 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
268
269
270
271 if (attempted == written) {
272 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
273 config().setMaxBytesPerGatheringWrite(attempted << 1);
274 }
275 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
276 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
277 }
278 }
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
296 final long expectedWrittenBytes = array.size();
297 assert expectedWrittenBytes != 0;
298 final int cnt = array.count();
299 assert cnt != 0;
300
301 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
302 if (localWrittenBytes > 0) {
303 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
304 in.removeBytes(localWrittenBytes);
305 return 1;
306 }
307 return WRITE_STATUS_SNDBUF_FULL;
308 }
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 private int writeBytesMultiple(
329 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
330 long maxBytesPerGatheringWrite) throws IOException {
331 assert expectedWrittenBytes != 0;
332 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
333 expectedWrittenBytes = maxBytesPerGatheringWrite;
334 }
335
336 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
337 if (localWrittenBytes > 0) {
338 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
339 in.removeBytes(localWrittenBytes);
340 return 1;
341 }
342 return WRITE_STATUS_SNDBUF_FULL;
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
360 final long offset = region.transferred();
361 final long regionCount = region.count();
362 if (offset >= regionCount) {
363 in.remove();
364 return 0;
365 }
366
367 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
368 if (flushedAmount > 0) {
369 in.progress(flushedAmount);
370 if (region.transferred() >= regionCount) {
371 in.remove();
372 }
373 return 1;
374 } else if (flushedAmount == 0) {
375 validateFileRegion(region, offset);
376 }
377 return WRITE_STATUS_SNDBUF_FULL;
378 }
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
395 if (region.transferred() >= region.count()) {
396 in.remove();
397 return 0;
398 }
399
400 if (byteChannel == null) {
401 byteChannel = new EpollSocketWritableByteChannel();
402 }
403 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
404 if (flushedAmount > 0) {
405 in.progress(flushedAmount);
406 if (region.transferred() >= region.count()) {
407 in.remove();
408 }
409 return 1;
410 }
411 return WRITE_STATUS_SNDBUF_FULL;
412 }
413
414 @Override
415 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
416 int writeSpinCount = config().getWriteSpinCount();
417 do {
418 final int msgCount = in.size();
419
420 if (msgCount > 1 && in.current() instanceof ByteBuf) {
421 writeSpinCount -= doWriteMultiple(in);
422 } else if (msgCount == 0) {
423
424 clearFlag(Native.EPOLLOUT);
425
426 return;
427 } else {
428 writeSpinCount -= doWriteSingle(in);
429 }
430
431
432
433
434 } while (writeSpinCount > 0);
435
436 if (writeSpinCount == 0) {
437
438
439
440
441 clearFlag(Native.EPOLLOUT);
442
443
444 eventLoop().execute(flushTask);
445 } else {
446
447
448 setFlag(Native.EPOLLOUT);
449 }
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
467
468 Object msg = in.current();
469 if (msg instanceof ByteBuf) {
470 return writeBytes(in, (ByteBuf) msg);
471 } else if (msg instanceof DefaultFileRegion) {
472 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
473 } else if (msg instanceof FileRegion) {
474 return writeFileRegion(in, (FileRegion) msg);
475 } else if (msg instanceof SpliceOutTask) {
476 if (!((SpliceOutTask) msg).spliceOut()) {
477 return WRITE_STATUS_SNDBUF_FULL;
478 }
479 in.remove();
480 return 1;
481 } else {
482
483 throw new Error();
484 }
485 }
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
502 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
503 IovArray array = registration().ioHandler().cleanIovArray();
504 array.maxBytes(maxBytesPerGatheringWrite);
505 in.forEachFlushedMessage(array);
506
507 if (array.count() >= 1) {
508
509 return writeBytesMultiple(in, array);
510 }
511
512 in.removeBytes(0);
513 return 0;
514 }
515
516 @Override
517 protected Object filterOutboundMessage(Object msg) {
518 if (msg instanceof ByteBuf) {
519 ByteBuf buf = (ByteBuf) msg;
520 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
521 }
522
523 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
524 return msg;
525 }
526
527 throw new UnsupportedOperationException(
528 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
529 }
530
531 @Override
532 protected final void doShutdownOutput() throws Exception {
533 socket.shutdown(false, true);
534 }
535
536 private void shutdownInput0(final ChannelPromise promise) {
537 try {
538 socket.shutdown(true, false);
539 promise.setSuccess();
540 } catch (Throwable cause) {
541 promise.setFailure(cause);
542 }
543 }
544
545 @Override
546 public boolean isOutputShutdown() {
547 return socket.isOutputShutdown();
548 }
549
550 @Override
551 public boolean isInputShutdown() {
552 return socket.isInputShutdown();
553 }
554
555 @Override
556 public boolean isShutdown() {
557 return socket.isShutdown();
558 }
559
560 @Override
561 public ChannelFuture shutdownOutput() {
562 return shutdownOutput(newPromise());
563 }
564
565 @Override
566 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
567 EventLoop loop = eventLoop();
568 if (loop.inEventLoop()) {
569 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
570 } else {
571 loop.execute(new Runnable() {
572 @Override
573 public void run() {
574 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
575 }
576 });
577 }
578
579 return promise;
580 }
581
582 @Override
583 public ChannelFuture shutdownInput() {
584 return shutdownInput(newPromise());
585 }
586
587 @Override
588 public ChannelFuture shutdownInput(final ChannelPromise promise) {
589 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
590 if (closeExecutor != null) {
591 closeExecutor.execute(new Runnable() {
592 @Override
593 public void run() {
594 shutdownInput0(promise);
595 }
596 });
597 } else {
598 EventLoop loop = eventLoop();
599 if (loop.inEventLoop()) {
600 shutdownInput0(promise);
601 } else {
602 loop.execute(new Runnable() {
603 @Override
604 public void run() {
605 shutdownInput0(promise);
606 }
607 });
608 }
609 }
610 return promise;
611 }
612
613 @Override
614 public ChannelFuture shutdown() {
615 return shutdown(newPromise());
616 }
617
618 @Override
619 public ChannelFuture shutdown(final ChannelPromise promise) {
620 ChannelFuture shutdownOutputFuture = shutdownOutput();
621 if (shutdownOutputFuture.isDone()) {
622 shutdownOutputDone(shutdownOutputFuture, promise);
623 } else {
624 shutdownOutputFuture.addListener(new ChannelFutureListener() {
625 @Override
626 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
627 shutdownOutputDone(shutdownOutputFuture, promise);
628 }
629 });
630 }
631 return promise;
632 }
633
634 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
635 ChannelFuture shutdownInputFuture = shutdownInput();
636 if (shutdownInputFuture.isDone()) {
637 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
638 } else {
639 shutdownInputFuture.addListener(new ChannelFutureListener() {
640 @Override
641 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
642 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
643 }
644 });
645 }
646 }
647
648 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
649 ChannelFuture shutdownInputFuture,
650 ChannelPromise promise) {
651 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
652 Throwable shutdownInputCause = shutdownInputFuture.cause();
653 if (shutdownOutputCause != null) {
654 if (shutdownInputCause != null) {
655 logger.debug("Exception suppressed because a previous exception occurred.",
656 shutdownInputCause);
657 }
658 promise.setFailure(shutdownOutputCause);
659 } else if (shutdownInputCause != null) {
660 promise.setFailure(shutdownInputCause);
661 } else {
662 promise.setSuccess();
663 }
664 }
665
666 @Override
667 protected void doClose() throws Exception {
668 try {
669
670 super.doClose();
671 } finally {
672 safeClosePipe(pipeIn);
673 safeClosePipe(pipeOut);
674 clearSpliceQueue(null);
675 }
676 }
677
678 private void clearSpliceQueue(ClosedChannelException exception) {
679 Queue<SpliceInTask> sQueue = spliceQueue;
680 if (sQueue == null) {
681 return;
682 }
683 for (;;) {
684 SpliceInTask task = sQueue.poll();
685 if (task == null) {
686 break;
687 }
688 if (exception == null) {
689 exception = new ClosedChannelException();
690 }
691 task.promise.tryFailure(exception);
692 }
693 }
694
695 private static void safeClosePipe(FileDescriptor fd) {
696 if (fd != null) {
697 try {
698 fd.close();
699 } catch (IOException e) {
700 logger.warn("Error while closing a pipe", e);
701 }
702 }
703 }
704
705 class EpollStreamUnsafe extends AbstractEpollUnsafe {
706
707 @Override
708 protected Executor prepareToClose() {
709 return super.prepareToClose();
710 }
711
712 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause,
713 boolean allDataRead, EpollRecvByteAllocatorHandle allocHandle) {
714 if (byteBuf != null) {
715 if (byteBuf.isReadable()) {
716 readPending = false;
717 pipeline.fireChannelRead(byteBuf);
718 } else {
719 byteBuf.release();
720 }
721 }
722 allocHandle.readComplete();
723 pipeline.fireChannelReadComplete();
724 pipeline.fireExceptionCaught(cause);
725
726
727
728 if (allDataRead || cause instanceof OutOfMemoryError || cause instanceof IOException) {
729 shutdownInput(true);
730 }
731 }
732
733 @Override
734 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
735 return new EpollRecvByteAllocatorStreamingHandle(handle);
736 }
737
738 @Override
739 void epollInReady() {
740 final ChannelConfig config = config();
741 if (shouldBreakEpollInReady(config)) {
742 clearEpollIn0();
743 return;
744 }
745 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
746 final ChannelPipeline pipeline = pipeline();
747 final ByteBufAllocator allocator = config.getAllocator();
748 allocHandle.reset(config);
749
750 ByteBuf byteBuf = null;
751 boolean allDataRead = false;
752 Queue<SpliceInTask> sQueue = null;
753 try {
754 do {
755 if (sQueue != null || (sQueue = spliceQueue) != null) {
756 SpliceInTask spliceTask = sQueue.peek();
757 if (spliceTask != null) {
758 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
759
760 if (allocHandle.isReceivedRdHup()) {
761 shutdownInput(false);
762 }
763 if (spliceInResult) {
764
765
766 if (isActive()) {
767 sQueue.remove();
768 }
769 continue;
770 } else {
771 break;
772 }
773 }
774 }
775
776
777
778 byteBuf = allocHandle.allocate(allocator);
779 allocHandle.lastBytesRead(doReadBytes(byteBuf));
780 if (allocHandle.lastBytesRead() <= 0) {
781
782 byteBuf.release();
783 byteBuf = null;
784 allDataRead = allocHandle.lastBytesRead() < 0;
785 if (allDataRead) {
786
787 readPending = false;
788 }
789 break;
790 }
791 allocHandle.incMessagesRead(1);
792 readPending = false;
793 pipeline.fireChannelRead(byteBuf);
794 byteBuf = null;
795
796 if (shouldBreakEpollInReady(config)) {
797
798
799
800
801
802
803
804
805
806
807
808 break;
809 }
810 } while (allocHandle.continueReading());
811
812 allocHandle.readComplete();
813 pipeline.fireChannelReadComplete();
814
815 if (allDataRead) {
816 shutdownInput(true);
817 }
818 } catch (Throwable t) {
819 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
820 } finally {
821 if (sQueue == null) {
822 if (shouldStopReading(config)) {
823 clearEpollIn();
824 }
825 } else {
826 if (!config.isAutoRead()) {
827 clearEpollIn();
828 }
829 }
830 }
831 }
832 }
833
834 private void addToSpliceQueue(final SpliceInTask task) {
835 Queue<SpliceInTask> sQueue = spliceQueue;
836 if (sQueue == null) {
837 synchronized (this) {
838 sQueue = spliceQueue;
839 if (sQueue == null) {
840 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
841 }
842 }
843 }
844 sQueue.add(task);
845 }
846
847 protected abstract class SpliceInTask {
848 final ChannelPromise promise;
849 int len;
850
851 protected SpliceInTask(int len, ChannelPromise promise) {
852 this.promise = promise;
853 this.len = len;
854 }
855
856 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
857
858 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
859
860 int length = Math.min(handle.guess(), len);
861 int splicedIn = 0;
862 for (;;) {
863
864 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
865 handle.lastBytesRead(localSplicedIn);
866 if (localSplicedIn == 0) {
867 break;
868 }
869 splicedIn += localSplicedIn;
870 length -= localSplicedIn;
871 }
872
873 return splicedIn;
874 }
875 }
876
877
878 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
879 private final AbstractEpollStreamChannel ch;
880
881 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
882 super(len, promise);
883 this.ch = ch;
884 }
885
886 @Override
887 public void operationComplete(ChannelFuture future) throws Exception {
888 if (!future.isSuccess()) {
889
890 promise.tryFailure(future.cause());
891 }
892 }
893
894 @Override
895 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
896 assert ch.eventLoop().inEventLoop();
897 if (len == 0) {
898
899 promise.trySuccess();
900 return true;
901 }
902 try {
903
904
905
906 FileDescriptor pipeOut = ch.pipeOut;
907 if (pipeOut == null) {
908
909 FileDescriptor[] pipe = pipe();
910 ch.pipeIn = pipe[0];
911 pipeOut = ch.pipeOut = pipe[1];
912 }
913
914 int splicedIn = spliceIn(pipeOut, handle);
915 if (splicedIn > 0) {
916
917 if (len != Integer.MAX_VALUE) {
918 len -= splicedIn;
919 }
920
921
922
923 final ChannelPromise splicePromise;
924 if (len == 0) {
925 splicePromise = promise;
926 } else {
927 splicePromise = ch.newPromise().addListener(this);
928 }
929
930 boolean autoRead = config().isAutoRead();
931
932
933
934 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
935 ch.unsafe().flush();
936 if (autoRead && !splicePromise.isDone()) {
937
938
939
940
941 config().setAutoRead(false);
942 }
943 }
944
945 return len == 0;
946 } catch (Throwable cause) {
947
948 promise.tryFailure(cause);
949 return true;
950 }
951 }
952 }
953
954 private final class SpliceOutTask {
955 private final AbstractEpollStreamChannel ch;
956 private final boolean autoRead;
957 private int len;
958
959 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
960 this.ch = ch;
961 this.len = len;
962 this.autoRead = autoRead;
963 }
964
965 public boolean spliceOut() throws Exception {
966 assert ch.eventLoop().inEventLoop();
967 try {
968 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
969 len -= splicedOut;
970 if (len == 0) {
971 if (autoRead) {
972
973 config().setAutoRead(true);
974 }
975 return true;
976 }
977 return false;
978 } catch (IOException e) {
979 if (autoRead) {
980
981 config().setAutoRead(true);
982 }
983 throw e;
984 }
985 }
986 }
987
988 private final class SpliceFdTask extends SpliceInTask {
989 private final FileDescriptor fd;
990 private final ChannelPromise promise;
991 private int offset;
992
993 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
994 super(len, promise);
995 this.fd = fd;
996 this.promise = promise;
997 this.offset = offset;
998 }
999
1000 @Override
1001 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1002 assert eventLoop().inEventLoop();
1003 if (len == 0) {
1004
1005 promise.trySuccess();
1006 return true;
1007 }
1008
1009 try {
1010 FileDescriptor[] pipe = pipe();
1011 FileDescriptor pipeIn = pipe[0];
1012 FileDescriptor pipeOut = pipe[1];
1013 try {
1014 int splicedIn = spliceIn(pipeOut, handle);
1015 if (splicedIn > 0) {
1016
1017 if (len != Integer.MAX_VALUE) {
1018 len -= splicedIn;
1019 }
1020 do {
1021 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1022 offset += splicedOut;
1023 splicedIn -= splicedOut;
1024 } while (splicedIn > 0);
1025 if (len == 0) {
1026
1027 promise.trySuccess();
1028 return true;
1029 }
1030 }
1031 return false;
1032 } finally {
1033 safeClosePipe(pipeIn);
1034 safeClosePipe(pipeOut);
1035 }
1036 } catch (Throwable cause) {
1037
1038 promise.tryFailure(cause);
1039 return true;
1040 }
1041 }
1042 }
1043
1044 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1045 EpollSocketWritableByteChannel() {
1046 super(socket);
1047 assert fd == socket;
1048 }
1049
1050 @Override
1051 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1052 return socket.send(buf, pos, limit);
1053 }
1054
1055 @Override
1056 protected ByteBufAllocator alloc() {
1057 return AbstractEpollStreamChannel.this.alloc();
1058 }
1059 }
1060 }