1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.WritableByteChannel;
48 import java.util.Queue;
49 import java.util.concurrent.Executor;
50
51 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53 import static io.netty.channel.unix.FileDescriptor.pipe;
54 import static io.netty.util.internal.ObjectUtil.checkNotNull;
55 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56
57 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
58 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
59 private static final String EXPECTED_TYPES =
60 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
61 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
63
64 private final Runnable flushTask = new Runnable() {
65 @Override
66 public void run() {
67
68
69 ((AbstractEpollUnsafe) unsafe()).flush0();
70 }
71 };
72
73
74 private volatile Queue<SpliceInTask> spliceQueue;
75 private FileDescriptor pipeIn;
76 private FileDescriptor pipeOut;
77
78 private WritableByteChannel byteChannel;
79
80 protected AbstractEpollStreamChannel(Channel parent, int fd) {
81 this(parent, new LinuxSocket(fd));
82 }
83
84 protected AbstractEpollStreamChannel(int fd) {
85 this(new LinuxSocket(fd));
86 }
87
88 AbstractEpollStreamChannel(LinuxSocket fd) {
89 this(fd, isSoErrorZero(fd));
90 }
91
92 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
93 super(parent, fd, true);
94
95 flags |= Native.EPOLLRDHUP;
96 }
97
98 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
99 super(parent, fd, remote);
100
101 flags |= Native.EPOLLRDHUP;
102 }
103
104 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
105 super(null, fd, active);
106
107 flags |= Native.EPOLLRDHUP;
108 }
109
110 @Override
111 protected AbstractEpollUnsafe newUnsafe() {
112 return new EpollStreamUnsafe();
113 }
114
115 @Override
116 public ChannelMetadata metadata() {
117 return METADATA;
118 }
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
135 return spliceTo(ch, len, newPromise());
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
153 final ChannelPromise promise) {
154 if (ch.eventLoop() != eventLoop()) {
155 throw new IllegalArgumentException("EventLoops are not the same.");
156 }
157 checkPositiveOrZero(len, "len");
158 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
159 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
160 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
161 }
162 checkNotNull(promise, "promise");
163 if (!isOpen()) {
164 promise.tryFailure(new ClosedChannelException());
165 } else {
166 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
167 failSpliceIfClosed(promise);
168 }
169 return promise;
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
187 return spliceTo(ch, offset, len, newPromise());
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
205 final ChannelPromise promise) {
206 checkPositiveOrZero(len, "len");
207 checkPositiveOrZero(offset, "offset");
208 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
209 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
210 }
211 checkNotNull(promise, "promise");
212 if (!isOpen()) {
213 promise.tryFailure(new ClosedChannelException());
214 } else {
215 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
216 failSpliceIfClosed(promise);
217 }
218 return promise;
219 }
220
221 private void failSpliceIfClosed(ChannelPromise promise) {
222 if (!isOpen()) {
223
224
225 if (!promise.isDone()) {
226 final ClosedChannelException ex = new ClosedChannelException();
227 if (promise.tryFailure(ex)) {
228 eventLoop().execute(new Runnable() {
229 @Override
230 public void run() {
231
232 clearSpliceQueue(ex);
233 }
234 });
235 }
236 }
237 }
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
255 int readableBytes = buf.readableBytes();
256 if (readableBytes == 0) {
257 in.remove();
258 return 0;
259 }
260
261 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
262 return doWriteBytes(in, buf);
263 } else {
264 ByteBuffer[] nioBuffers = buf.nioBuffers();
265 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
266 config().getMaxBytesPerGatheringWrite());
267 }
268 }
269
270 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
271
272
273
274 if (attempted == written) {
275 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
276 config().setMaxBytesPerGatheringWrite(attempted << 1);
277 }
278 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
279 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
280 }
281 }
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
299 final long expectedWrittenBytes = array.size();
300 assert expectedWrittenBytes != 0;
301 final int cnt = array.count();
302 assert cnt != 0;
303
304 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
305 if (localWrittenBytes > 0) {
306 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
307 in.removeBytes(localWrittenBytes);
308 return 1;
309 }
310 return WRITE_STATUS_SNDBUF_FULL;
311 }
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331 private int writeBytesMultiple(
332 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
333 long maxBytesPerGatheringWrite) throws IOException {
334 assert expectedWrittenBytes != 0;
335 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
336 expectedWrittenBytes = maxBytesPerGatheringWrite;
337 }
338
339 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
340 if (localWrittenBytes > 0) {
341 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
342 in.removeBytes(localWrittenBytes);
343 return 1;
344 }
345 return WRITE_STATUS_SNDBUF_FULL;
346 }
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
363 final long offset = region.transferred();
364 final long regionCount = region.count();
365 if (offset >= regionCount) {
366 in.remove();
367 return 0;
368 }
369
370 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
371 if (flushedAmount > 0) {
372 in.progress(flushedAmount);
373 if (region.transferred() >= regionCount) {
374 in.remove();
375 }
376 return 1;
377 } else if (flushedAmount == 0) {
378 validateFileRegion(region, offset);
379 }
380 return WRITE_STATUS_SNDBUF_FULL;
381 }
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
398 if (region.transferred() >= region.count()) {
399 in.remove();
400 return 0;
401 }
402
403 if (byteChannel == null) {
404 byteChannel = new EpollSocketWritableByteChannel();
405 }
406 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
407 if (flushedAmount > 0) {
408 in.progress(flushedAmount);
409 if (region.transferred() >= region.count()) {
410 in.remove();
411 }
412 return 1;
413 }
414 return WRITE_STATUS_SNDBUF_FULL;
415 }
416
417 @Override
418 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
419 int writeSpinCount = config().getWriteSpinCount();
420 do {
421 final int msgCount = in.size();
422
423 if (msgCount > 1 && in.current() instanceof ByteBuf) {
424 writeSpinCount -= doWriteMultiple(in);
425 } else if (msgCount == 0) {
426
427 clearFlag(Native.EPOLLOUT);
428
429 return;
430 } else {
431 writeSpinCount -= doWriteSingle(in);
432 }
433
434
435
436
437 } while (writeSpinCount > 0);
438
439 if (writeSpinCount == 0) {
440
441
442
443
444 clearFlag(Native.EPOLLOUT);
445
446
447 eventLoop().execute(flushTask);
448 } else {
449
450
451 setFlag(Native.EPOLLOUT);
452 }
453 }
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
470
471 Object msg = in.current();
472 if (msg instanceof ByteBuf) {
473 return writeBytes(in, (ByteBuf) msg);
474 } else if (msg instanceof DefaultFileRegion) {
475 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
476 } else if (msg instanceof FileRegion) {
477 return writeFileRegion(in, (FileRegion) msg);
478 } else if (msg instanceof SpliceOutTask) {
479 if (!((SpliceOutTask) msg).spliceOut()) {
480 return WRITE_STATUS_SNDBUF_FULL;
481 }
482 in.remove();
483 return 1;
484 } else {
485
486 throw new Error();
487 }
488 }
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
505 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
506 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
507 array.maxBytes(maxBytesPerGatheringWrite);
508 in.forEachFlushedMessage(array);
509
510 if (array.count() >= 1) {
511
512 return writeBytesMultiple(in, array);
513 }
514
515 in.removeBytes(0);
516 return 0;
517 }
518
519 @Override
520 protected Object filterOutboundMessage(Object msg) {
521 if (msg instanceof ByteBuf) {
522 ByteBuf buf = (ByteBuf) msg;
523 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
524 }
525
526 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
527 return msg;
528 }
529
530 throw new UnsupportedOperationException(
531 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
532 }
533
534 @Override
535 protected final void doShutdownOutput() throws Exception {
536 socket.shutdown(false, true);
537 }
538
539 private void shutdownInput0(final ChannelPromise promise) {
540 try {
541 socket.shutdown(true, false);
542 promise.setSuccess();
543 } catch (Throwable cause) {
544 promise.setFailure(cause);
545 }
546 }
547
548 @Override
549 public boolean isOutputShutdown() {
550 return socket.isOutputShutdown();
551 }
552
553 @Override
554 public boolean isInputShutdown() {
555 return socket.isInputShutdown();
556 }
557
558 @Override
559 public boolean isShutdown() {
560 return socket.isShutdown();
561 }
562
563 @Override
564 public ChannelFuture shutdownOutput() {
565 return shutdownOutput(newPromise());
566 }
567
568 @Override
569 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
570 EventLoop loop = eventLoop();
571 if (loop.inEventLoop()) {
572 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
573 } else {
574 loop.execute(new Runnable() {
575 @Override
576 public void run() {
577 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
578 }
579 });
580 }
581
582 return promise;
583 }
584
585 @Override
586 public ChannelFuture shutdownInput() {
587 return shutdownInput(newPromise());
588 }
589
590 @Override
591 public ChannelFuture shutdownInput(final ChannelPromise promise) {
592 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
593 if (closeExecutor != null) {
594 closeExecutor.execute(new Runnable() {
595 @Override
596 public void run() {
597 shutdownInput0(promise);
598 }
599 });
600 } else {
601 EventLoop loop = eventLoop();
602 if (loop.inEventLoop()) {
603 shutdownInput0(promise);
604 } else {
605 loop.execute(new Runnable() {
606 @Override
607 public void run() {
608 shutdownInput0(promise);
609 }
610 });
611 }
612 }
613 return promise;
614 }
615
616 @Override
617 public ChannelFuture shutdown() {
618 return shutdown(newPromise());
619 }
620
621 @Override
622 public ChannelFuture shutdown(final ChannelPromise promise) {
623 ChannelFuture shutdownOutputFuture = shutdownOutput();
624 if (shutdownOutputFuture.isDone()) {
625 shutdownOutputDone(shutdownOutputFuture, promise);
626 } else {
627 shutdownOutputFuture.addListener(new ChannelFutureListener() {
628 @Override
629 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
630 shutdownOutputDone(shutdownOutputFuture, promise);
631 }
632 });
633 }
634 return promise;
635 }
636
637 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
638 ChannelFuture shutdownInputFuture = shutdownInput();
639 if (shutdownInputFuture.isDone()) {
640 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
641 } else {
642 shutdownInputFuture.addListener(new ChannelFutureListener() {
643 @Override
644 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
645 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
646 }
647 });
648 }
649 }
650
651 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
652 ChannelFuture shutdownInputFuture,
653 ChannelPromise promise) {
654 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
655 Throwable shutdownInputCause = shutdownInputFuture.cause();
656 if (shutdownOutputCause != null) {
657 if (shutdownInputCause != null) {
658 logger.debug("Exception suppressed because a previous exception occurred.",
659 shutdownInputCause);
660 }
661 promise.setFailure(shutdownOutputCause);
662 } else if (shutdownInputCause != null) {
663 promise.setFailure(shutdownInputCause);
664 } else {
665 promise.setSuccess();
666 }
667 }
668
669 @Override
670 protected void doClose() throws Exception {
671 try {
672
673 super.doClose();
674 } finally {
675 safeClosePipe(pipeIn);
676 safeClosePipe(pipeOut);
677 clearSpliceQueue(null);
678 }
679 }
680
681 private void clearSpliceQueue(ClosedChannelException exception) {
682 Queue<SpliceInTask> sQueue = spliceQueue;
683 if (sQueue == null) {
684 return;
685 }
686 for (;;) {
687 SpliceInTask task = sQueue.poll();
688 if (task == null) {
689 break;
690 }
691 if (exception == null) {
692 exception = new ClosedChannelException();
693 }
694 task.promise.tryFailure(exception);
695 }
696 }
697
698 private static void safeClosePipe(FileDescriptor fd) {
699 if (fd != null) {
700 try {
701 fd.close();
702 } catch (IOException e) {
703 logger.warn("Error while closing a pipe", e);
704 }
705 }
706 }
707
708 class EpollStreamUnsafe extends AbstractEpollUnsafe {
709
710 @Override
711 protected Executor prepareToClose() {
712 return super.prepareToClose();
713 }
714
715 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
716 EpollRecvByteAllocatorHandle allocHandle) {
717 if (byteBuf != null) {
718 if (byteBuf.isReadable()) {
719 readPending = false;
720 pipeline.fireChannelRead(byteBuf);
721 } else {
722 byteBuf.release();
723 }
724 }
725 allocHandle.readComplete();
726 pipeline.fireChannelReadComplete();
727 pipeline.fireExceptionCaught(cause);
728
729
730
731 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
732 shutdownInput(false);
733 }
734 }
735
736 @Override
737 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
738 return new EpollRecvByteAllocatorStreamingHandle(handle);
739 }
740
741 @Override
742 void epollInReady() {
743 final ChannelConfig config = config();
744 if (shouldBreakEpollInReady(config)) {
745 clearEpollIn0();
746 return;
747 }
748 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
749 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
750
751 final ChannelPipeline pipeline = pipeline();
752 final ByteBufAllocator allocator = config.getAllocator();
753 allocHandle.reset(config);
754 epollInBefore();
755
756 ByteBuf byteBuf = null;
757 boolean close = false;
758 Queue<SpliceInTask> sQueue = null;
759 try {
760 do {
761 if (sQueue != null || (sQueue = spliceQueue) != null) {
762 SpliceInTask spliceTask = sQueue.peek();
763 if (spliceTask != null) {
764 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
765
766 if (allocHandle.isReceivedRdHup()) {
767 shutdownInput(true);
768 }
769 if (spliceInResult) {
770
771
772 if (isActive()) {
773 sQueue.remove();
774 }
775 continue;
776 } else {
777 break;
778 }
779 }
780 }
781
782
783
784 byteBuf = allocHandle.allocate(allocator);
785 allocHandle.lastBytesRead(doReadBytes(byteBuf));
786 if (allocHandle.lastBytesRead() <= 0) {
787
788 byteBuf.release();
789 byteBuf = null;
790 close = allocHandle.lastBytesRead() < 0;
791 if (close) {
792
793 readPending = false;
794 }
795 break;
796 }
797 allocHandle.incMessagesRead(1);
798 readPending = false;
799 pipeline.fireChannelRead(byteBuf);
800 byteBuf = null;
801
802 if (shouldBreakEpollInReady(config)) {
803
804
805
806
807
808
809
810
811
812
813
814 break;
815 }
816 } while (allocHandle.continueReading());
817
818 allocHandle.readComplete();
819 pipeline.fireChannelReadComplete();
820
821 if (close) {
822 shutdownInput(false);
823 }
824 } catch (Throwable t) {
825 handleReadException(pipeline, byteBuf, t, close, allocHandle);
826 } finally {
827 if (sQueue == null) {
828 epollInFinally(config);
829 } else {
830 if (!config.isAutoRead()) {
831 clearEpollIn();
832 }
833 }
834 }
835 }
836 }
837
838 private void addToSpliceQueue(final SpliceInTask task) {
839 Queue<SpliceInTask> sQueue = spliceQueue;
840 if (sQueue == null) {
841 synchronized (this) {
842 sQueue = spliceQueue;
843 if (sQueue == null) {
844 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
845 }
846 }
847 }
848 sQueue.add(task);
849 }
850
851 protected abstract class SpliceInTask {
852 final ChannelPromise promise;
853 int len;
854
855 protected SpliceInTask(int len, ChannelPromise promise) {
856 this.promise = promise;
857 this.len = len;
858 }
859
860 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
861
862 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
863
864 int length = Math.min(handle.guess(), len);
865 int splicedIn = 0;
866 for (;;) {
867
868 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
869 handle.lastBytesRead(localSplicedIn);
870 if (localSplicedIn == 0) {
871 break;
872 }
873 splicedIn += localSplicedIn;
874 length -= localSplicedIn;
875 }
876
877 return splicedIn;
878 }
879 }
880
881
882 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
883 private final AbstractEpollStreamChannel ch;
884
885 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
886 super(len, promise);
887 this.ch = ch;
888 }
889
890 @Override
891 public void operationComplete(ChannelFuture future) throws Exception {
892 if (!future.isSuccess()) {
893
894 promise.tryFailure(future.cause());
895 }
896 }
897
898 @Override
899 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
900 assert ch.eventLoop().inEventLoop();
901 if (len == 0) {
902
903 promise.trySuccess();
904 return true;
905 }
906 try {
907
908
909
910 FileDescriptor pipeOut = ch.pipeOut;
911 if (pipeOut == null) {
912
913 FileDescriptor[] pipe = pipe();
914 ch.pipeIn = pipe[0];
915 pipeOut = ch.pipeOut = pipe[1];
916 }
917
918 int splicedIn = spliceIn(pipeOut, handle);
919 if (splicedIn > 0) {
920
921 if (len != Integer.MAX_VALUE) {
922 len -= splicedIn;
923 }
924
925
926
927 final ChannelPromise splicePromise;
928 if (len == 0) {
929 splicePromise = promise;
930 } else {
931 splicePromise = ch.newPromise().addListener(this);
932 }
933
934 boolean autoRead = config().isAutoRead();
935
936
937
938 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
939 ch.unsafe().flush();
940 if (autoRead && !splicePromise.isDone()) {
941
942
943
944
945 config().setAutoRead(false);
946 }
947 }
948
949 return len == 0;
950 } catch (Throwable cause) {
951
952 promise.tryFailure(cause);
953 return true;
954 }
955 }
956 }
957
958 private final class SpliceOutTask {
959 private final AbstractEpollStreamChannel ch;
960 private final boolean autoRead;
961 private int len;
962
963 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
964 this.ch = ch;
965 this.len = len;
966 this.autoRead = autoRead;
967 }
968
969 public boolean spliceOut() throws Exception {
970 assert ch.eventLoop().inEventLoop();
971 try {
972 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
973 len -= splicedOut;
974 if (len == 0) {
975 if (autoRead) {
976
977 config().setAutoRead(true);
978 }
979 return true;
980 }
981 return false;
982 } catch (IOException e) {
983 if (autoRead) {
984
985 config().setAutoRead(true);
986 }
987 throw e;
988 }
989 }
990 }
991
992 private final class SpliceFdTask extends SpliceInTask {
993 private final FileDescriptor fd;
994 private final ChannelPromise promise;
995 private int offset;
996
997 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
998 super(len, promise);
999 this.fd = fd;
1000 this.promise = promise;
1001 this.offset = offset;
1002 }
1003
1004 @Override
1005 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1006 assert eventLoop().inEventLoop();
1007 if (len == 0) {
1008
1009 promise.trySuccess();
1010 return true;
1011 }
1012
1013 try {
1014 FileDescriptor[] pipe = pipe();
1015 FileDescriptor pipeIn = pipe[0];
1016 FileDescriptor pipeOut = pipe[1];
1017 try {
1018 int splicedIn = spliceIn(pipeOut, handle);
1019 if (splicedIn > 0) {
1020
1021 if (len != Integer.MAX_VALUE) {
1022 len -= splicedIn;
1023 }
1024 do {
1025 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1026 offset += splicedOut;
1027 splicedIn -= splicedOut;
1028 } while (splicedIn > 0);
1029 if (len == 0) {
1030
1031 promise.trySuccess();
1032 return true;
1033 }
1034 }
1035 return false;
1036 } finally {
1037 safeClosePipe(pipeIn);
1038 safeClosePipe(pipeOut);
1039 }
1040 } catch (Throwable cause) {
1041
1042 promise.tryFailure(cause);
1043 return true;
1044 }
1045 }
1046 }
1047
1048 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1049 EpollSocketWritableByteChannel() {
1050 super(socket);
1051 assert fd == socket;
1052 }
1053
1054 @Override
1055 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1056 return socket.send(buf, pos, limit);
1057 }
1058
1059 @Override
1060 protected ByteBufAllocator alloc() {
1061 return AbstractEpollStreamChannel.this.alloc();
1062 }
1063 }
1064 }