1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.WritableByteChannel;
48 import java.util.Queue;
49 import java.util.concurrent.Executor;
50
51 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53 import static io.netty.channel.unix.FileDescriptor.pipe;
54 import static io.netty.util.internal.ObjectUtil.checkNotNull;
55 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56
57 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
58 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
59 private static final String EXPECTED_TYPES =
60 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
61 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
63
64 private final Runnable flushTask = new Runnable() {
65 @Override
66 public void run() {
67
68
69 ((AbstractEpollUnsafe) unsafe()).flush0();
70 }
71 };
72
73
74 private volatile Queue<SpliceInTask> spliceQueue;
75 private FileDescriptor pipeIn;
76 private FileDescriptor pipeOut;
77
78 private WritableByteChannel byteChannel;
79
80 protected AbstractEpollStreamChannel(Channel parent, int fd) {
81 this(parent, new LinuxSocket(fd));
82 }
83
84 protected AbstractEpollStreamChannel(int fd) {
85 this(new LinuxSocket(fd));
86 }
87
88 AbstractEpollStreamChannel(LinuxSocket fd) {
89 this(fd, isSoErrorZero(fd));
90 }
91
92 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
93 super(parent, fd, true);
94
95 flags |= Native.EPOLLRDHUP;
96 }
97
98 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
99 super(parent, fd, remote);
100
101 flags |= Native.EPOLLRDHUP;
102 }
103
104 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
105 super(null, fd, active);
106
107 flags |= Native.EPOLLRDHUP;
108 }
109
110 @Override
111 protected AbstractEpollUnsafe newUnsafe() {
112 return new EpollStreamUnsafe();
113 }
114
115 @Override
116 public ChannelMetadata metadata() {
117 return METADATA;
118 }
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 @Deprecated
135 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136 return spliceTo(ch, len, newPromise());
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 @Deprecated
154 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
155 final ChannelPromise promise) {
156 if (ch.eventLoop() != eventLoop()) {
157 throw new IllegalArgumentException("EventLoops are not the same.");
158 }
159 checkPositiveOrZero(len, "len");
160 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
161 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
162 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
163 }
164 checkNotNull(promise, "promise");
165 if (!isOpen()) {
166 promise.tryFailure(new ClosedChannelException());
167 } else {
168 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
169 failSpliceIfClosed(promise);
170 }
171 return promise;
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 @Deprecated
190 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
191 return spliceTo(ch, offset, len, newPromise());
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 @Deprecated
210 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
211 final ChannelPromise promise) {
212 checkPositiveOrZero(len, "len");
213 checkPositiveOrZero(offset, "offset");
214 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
215 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
216 }
217 checkNotNull(promise, "promise");
218 if (!isOpen()) {
219 promise.tryFailure(new ClosedChannelException());
220 } else {
221 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
222 failSpliceIfClosed(promise);
223 }
224 return promise;
225 }
226
227 private void failSpliceIfClosed(ChannelPromise promise) {
228 if (!isOpen()) {
229
230
231 if (!promise.isDone()) {
232 final ClosedChannelException ex = new ClosedChannelException();
233 if (promise.tryFailure(ex)) {
234 eventLoop().execute(new Runnable() {
235 @Override
236 public void run() {
237
238 clearSpliceQueue(ex);
239 }
240 });
241 }
242 }
243 }
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
261 int readableBytes = buf.readableBytes();
262 if (readableBytes == 0) {
263 in.remove();
264 return 0;
265 }
266
267 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
268 return doWriteBytes(in, buf);
269 } else {
270 ByteBuffer[] nioBuffers = buf.nioBuffers();
271 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
272 config().getMaxBytesPerGatheringWrite());
273 }
274 }
275
276 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
277
278
279
280 if (attempted == written) {
281 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
282 config().setMaxBytesPerGatheringWrite(attempted << 1);
283 }
284 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
285 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
286 }
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
305 final long expectedWrittenBytes = array.size();
306 assert expectedWrittenBytes != 0;
307 final int cnt = array.count();
308 assert cnt != 0;
309
310 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
311 if (localWrittenBytes > 0) {
312 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
313 in.removeBytes(localWrittenBytes);
314 return 1;
315 }
316 return WRITE_STATUS_SNDBUF_FULL;
317 }
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337 private int writeBytesMultiple(
338 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
339 long maxBytesPerGatheringWrite) throws IOException {
340 assert expectedWrittenBytes != 0;
341 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
342 expectedWrittenBytes = maxBytesPerGatheringWrite;
343 }
344
345 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
346 if (localWrittenBytes > 0) {
347 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
348 in.removeBytes(localWrittenBytes);
349 return 1;
350 }
351 return WRITE_STATUS_SNDBUF_FULL;
352 }
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
369 final long offset = region.transferred();
370 final long regionCount = region.count();
371 if (offset >= regionCount) {
372 in.remove();
373 return 0;
374 }
375
376 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
377 if (flushedAmount > 0) {
378 in.progress(flushedAmount);
379 if (region.transferred() >= regionCount) {
380 in.remove();
381 }
382 return 1;
383 } else if (flushedAmount == 0) {
384 validateFileRegion(region, offset);
385 }
386 return WRITE_STATUS_SNDBUF_FULL;
387 }
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
404 if (region.transferred() >= region.count()) {
405 in.remove();
406 return 0;
407 }
408
409 if (byteChannel == null) {
410 byteChannel = new EpollSocketWritableByteChannel();
411 }
412 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
413 if (flushedAmount > 0) {
414 in.progress(flushedAmount);
415 if (region.transferred() >= region.count()) {
416 in.remove();
417 }
418 return 1;
419 }
420 return WRITE_STATUS_SNDBUF_FULL;
421 }
422
423 @Override
424 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
425 int writeSpinCount = config().getWriteSpinCount();
426 do {
427 final int msgCount = in.size();
428
429 if (msgCount > 1 && in.current() instanceof ByteBuf) {
430 writeSpinCount -= doWriteMultiple(in);
431 } else if (msgCount == 0) {
432
433 clearFlag(Native.EPOLLOUT);
434
435 return;
436 } else {
437 writeSpinCount -= doWriteSingle(in);
438 }
439
440
441
442
443 } while (writeSpinCount > 0);
444
445 if (writeSpinCount == 0) {
446
447
448
449
450 clearFlag(Native.EPOLLOUT);
451
452
453 eventLoop().execute(flushTask);
454 } else {
455
456
457 setFlag(Native.EPOLLOUT);
458 }
459 }
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
476
477 Object msg = in.current();
478 if (msg instanceof ByteBuf) {
479 return writeBytes(in, (ByteBuf) msg);
480 } else if (msg instanceof DefaultFileRegion) {
481 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
482 } else if (msg instanceof FileRegion) {
483 return writeFileRegion(in, (FileRegion) msg);
484 } else if (msg instanceof SpliceOutTask) {
485 if (!((SpliceOutTask) msg).spliceOut()) {
486 return WRITE_STATUS_SNDBUF_FULL;
487 }
488 in.remove();
489 return 1;
490 } else {
491
492 throw new Error();
493 }
494 }
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
511 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
512 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
513 array.maxBytes(maxBytesPerGatheringWrite);
514 in.forEachFlushedMessage(array);
515
516 if (array.count() >= 1) {
517
518 return writeBytesMultiple(in, array);
519 }
520
521 in.removeBytes(0);
522 return 0;
523 }
524
525 @Override
526 protected Object filterOutboundMessage(Object msg) {
527 if (msg instanceof ByteBuf) {
528 ByteBuf buf = (ByteBuf) msg;
529 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
530 }
531
532 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
533 return msg;
534 }
535
536 throw new UnsupportedOperationException(
537 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
538 }
539
540 @Override
541 protected final void doShutdownOutput() throws Exception {
542 socket.shutdown(false, true);
543 }
544
545 private void shutdownInput0(final ChannelPromise promise) {
546 try {
547 socket.shutdown(true, false);
548 promise.setSuccess();
549 } catch (Throwable cause) {
550 promise.setFailure(cause);
551 }
552 }
553
554 @Override
555 public boolean isOutputShutdown() {
556 return socket.isOutputShutdown();
557 }
558
559 @Override
560 public boolean isInputShutdown() {
561 return socket.isInputShutdown();
562 }
563
564 @Override
565 public boolean isShutdown() {
566 return socket.isShutdown();
567 }
568
569 @Override
570 public ChannelFuture shutdownOutput() {
571 return shutdownOutput(newPromise());
572 }
573
574 @Override
575 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
576 EventLoop loop = eventLoop();
577 if (loop.inEventLoop()) {
578 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
579 } else {
580 loop.execute(new Runnable() {
581 @Override
582 public void run() {
583 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
584 }
585 });
586 }
587
588 return promise;
589 }
590
591 @Override
592 public ChannelFuture shutdownInput() {
593 return shutdownInput(newPromise());
594 }
595
596 @Override
597 public ChannelFuture shutdownInput(final ChannelPromise promise) {
598 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
599 if (closeExecutor != null) {
600 closeExecutor.execute(new Runnable() {
601 @Override
602 public void run() {
603 shutdownInput0(promise);
604 }
605 });
606 } else {
607 EventLoop loop = eventLoop();
608 if (loop.inEventLoop()) {
609 shutdownInput0(promise);
610 } else {
611 loop.execute(new Runnable() {
612 @Override
613 public void run() {
614 shutdownInput0(promise);
615 }
616 });
617 }
618 }
619 return promise;
620 }
621
622 @Override
623 public ChannelFuture shutdown() {
624 return shutdown(newPromise());
625 }
626
627 @Override
628 public ChannelFuture shutdown(final ChannelPromise promise) {
629 ChannelFuture shutdownOutputFuture = shutdownOutput();
630 if (shutdownOutputFuture.isDone()) {
631 shutdownOutputDone(shutdownOutputFuture, promise);
632 } else {
633 shutdownOutputFuture.addListener(new ChannelFutureListener() {
634 @Override
635 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
636 shutdownOutputDone(shutdownOutputFuture, promise);
637 }
638 });
639 }
640 return promise;
641 }
642
643 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
644 ChannelFuture shutdownInputFuture = shutdownInput();
645 if (shutdownInputFuture.isDone()) {
646 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
647 } else {
648 shutdownInputFuture.addListener(new ChannelFutureListener() {
649 @Override
650 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
651 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
652 }
653 });
654 }
655 }
656
657 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
658 ChannelFuture shutdownInputFuture,
659 ChannelPromise promise) {
660 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
661 Throwable shutdownInputCause = shutdownInputFuture.cause();
662 if (shutdownOutputCause != null) {
663 if (shutdownInputCause != null) {
664 logger.debug("Exception suppressed because a previous exception occurred.",
665 shutdownInputCause);
666 }
667 promise.setFailure(shutdownOutputCause);
668 } else if (shutdownInputCause != null) {
669 promise.setFailure(shutdownInputCause);
670 } else {
671 promise.setSuccess();
672 }
673 }
674
675 @Override
676 protected void doClose() throws Exception {
677 try {
678
679 super.doClose();
680 } finally {
681 safeClosePipe(pipeIn);
682 safeClosePipe(pipeOut);
683 clearSpliceQueue(null);
684 }
685 }
686
687 private void clearSpliceQueue(ClosedChannelException exception) {
688 Queue<SpliceInTask> sQueue = spliceQueue;
689 if (sQueue == null) {
690 return;
691 }
692 for (;;) {
693 SpliceInTask task = sQueue.poll();
694 if (task == null) {
695 break;
696 }
697 if (exception == null) {
698 exception = new ClosedChannelException();
699 }
700 task.promise.tryFailure(exception);
701 }
702 }
703
704 private static void safeClosePipe(FileDescriptor fd) {
705 if (fd != null) {
706 try {
707 fd.close();
708 } catch (IOException e) {
709 logger.warn("Error while closing a pipe", e);
710 }
711 }
712 }
713
714 class EpollStreamUnsafe extends AbstractEpollUnsafe {
715
716 @Override
717 protected Executor prepareToClose() {
718 return super.prepareToClose();
719 }
720
721 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
722 EpollRecvByteAllocatorHandle allocHandle) {
723 if (byteBuf != null) {
724 if (byteBuf.isReadable()) {
725 readPending = false;
726 pipeline.fireChannelRead(byteBuf);
727 } else {
728 byteBuf.release();
729 }
730 }
731 allocHandle.readComplete();
732 pipeline.fireChannelReadComplete();
733 pipeline.fireExceptionCaught(cause);
734
735
736
737 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
738 shutdownInput(false);
739 }
740 }
741
742 @Override
743 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
744 return new EpollRecvByteAllocatorStreamingHandle(handle);
745 }
746
747 @Override
748 void epollInReady() {
749 final ChannelConfig config = config();
750 if (shouldBreakEpollInReady(config)) {
751 clearEpollIn0();
752 return;
753 }
754 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
755 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
756
757 final ChannelPipeline pipeline = pipeline();
758 final ByteBufAllocator allocator = config.getAllocator();
759 allocHandle.reset(config);
760 epollInBefore();
761
762 ByteBuf byteBuf = null;
763 boolean close = false;
764 Queue<SpliceInTask> sQueue = null;
765 try {
766 do {
767 if (sQueue != null || (sQueue = spliceQueue) != null) {
768 SpliceInTask spliceTask = sQueue.peek();
769 if (spliceTask != null) {
770 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
771
772 if (allocHandle.isReceivedRdHup()) {
773 shutdownInput(true);
774 }
775 if (spliceInResult) {
776
777
778 if (isActive()) {
779 sQueue.remove();
780 }
781 continue;
782 } else {
783 break;
784 }
785 }
786 }
787
788
789
790 byteBuf = allocHandle.allocate(allocator);
791 allocHandle.lastBytesRead(doReadBytes(byteBuf));
792 if (allocHandle.lastBytesRead() <= 0) {
793
794 byteBuf.release();
795 byteBuf = null;
796 close = allocHandle.lastBytesRead() < 0;
797 if (close) {
798
799 readPending = false;
800 }
801 break;
802 }
803 allocHandle.incMessagesRead(1);
804 readPending = false;
805 pipeline.fireChannelRead(byteBuf);
806 byteBuf = null;
807
808 if (shouldBreakEpollInReady(config)) {
809
810
811
812
813
814
815
816
817
818
819
820 break;
821 }
822 } while (allocHandle.continueReading());
823
824 allocHandle.readComplete();
825 pipeline.fireChannelReadComplete();
826
827 if (close) {
828 shutdownInput(false);
829 }
830 } catch (Throwable t) {
831 handleReadException(pipeline, byteBuf, t, close, allocHandle);
832 } finally {
833 if (sQueue == null) {
834 epollInFinally(config);
835 } else {
836 if (!config.isAutoRead()) {
837 clearEpollIn();
838 }
839 }
840 }
841 }
842 }
843
844 private void addToSpliceQueue(final SpliceInTask task) {
845 Queue<SpliceInTask> sQueue = spliceQueue;
846 if (sQueue == null) {
847 synchronized (this) {
848 sQueue = spliceQueue;
849 if (sQueue == null) {
850 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
851 }
852 }
853 }
854 sQueue.add(task);
855 }
856
857 protected abstract class SpliceInTask {
858 final ChannelPromise promise;
859 int len;
860
861 protected SpliceInTask(int len, ChannelPromise promise) {
862 this.promise = promise;
863 this.len = len;
864 }
865
866 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
867
868 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
869
870 int length = Math.min(handle.guess(), len);
871 int splicedIn = 0;
872 for (;;) {
873
874 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
875 handle.lastBytesRead(localSplicedIn);
876 if (localSplicedIn == 0) {
877 break;
878 }
879 splicedIn += localSplicedIn;
880 length -= localSplicedIn;
881 }
882
883 return splicedIn;
884 }
885 }
886
887
888 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
889 private final AbstractEpollStreamChannel ch;
890
891 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
892 super(len, promise);
893 this.ch = ch;
894 }
895
896 @Override
897 public void operationComplete(ChannelFuture future) throws Exception {
898 if (!future.isSuccess()) {
899
900 promise.tryFailure(future.cause());
901 }
902 }
903
904 @Override
905 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
906 assert ch.eventLoop().inEventLoop();
907 if (len == 0) {
908
909 promise.trySuccess();
910 return true;
911 }
912 try {
913
914
915
916 FileDescriptor pipeOut = ch.pipeOut;
917 if (pipeOut == null) {
918
919 FileDescriptor[] pipe = pipe();
920 ch.pipeIn = pipe[0];
921 pipeOut = ch.pipeOut = pipe[1];
922 }
923
924 int splicedIn = spliceIn(pipeOut, handle);
925 if (splicedIn > 0) {
926
927 if (len != Integer.MAX_VALUE) {
928 len -= splicedIn;
929 }
930
931
932
933 final ChannelPromise splicePromise;
934 if (len == 0) {
935 splicePromise = promise;
936 } else {
937 splicePromise = ch.newPromise().addListener(this);
938 }
939
940 boolean autoRead = config().isAutoRead();
941
942
943
944 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
945 ch.unsafe().flush();
946 if (autoRead && !splicePromise.isDone()) {
947
948
949
950
951 config().setAutoRead(false);
952 }
953 }
954
955 return len == 0;
956 } catch (Throwable cause) {
957
958 promise.tryFailure(cause);
959 return true;
960 }
961 }
962 }
963
964 private final class SpliceOutTask {
965 private final AbstractEpollStreamChannel ch;
966 private final boolean autoRead;
967 private int len;
968
969 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
970 this.ch = ch;
971 this.len = len;
972 this.autoRead = autoRead;
973 }
974
975 public boolean spliceOut() throws Exception {
976 assert ch.eventLoop().inEventLoop();
977 try {
978 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
979 len -= splicedOut;
980 if (len == 0) {
981 if (autoRead) {
982
983 config().setAutoRead(true);
984 }
985 return true;
986 }
987 return false;
988 } catch (IOException e) {
989 if (autoRead) {
990
991 config().setAutoRead(true);
992 }
993 throw e;
994 }
995 }
996 }
997
998 private final class SpliceFdTask extends SpliceInTask {
999 private final FileDescriptor fd;
1000 private final ChannelPromise promise;
1001 private int offset;
1002
1003 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1004 super(len, promise);
1005 this.fd = fd;
1006 this.promise = promise;
1007 this.offset = offset;
1008 }
1009
1010 @Override
1011 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1012 assert eventLoop().inEventLoop();
1013 if (len == 0) {
1014
1015 promise.trySuccess();
1016 return true;
1017 }
1018
1019 try {
1020 FileDescriptor[] pipe = pipe();
1021 FileDescriptor pipeIn = pipe[0];
1022 FileDescriptor pipeOut = pipe[1];
1023 try {
1024 int splicedIn = spliceIn(pipeOut, handle);
1025 if (splicedIn > 0) {
1026
1027 if (len != Integer.MAX_VALUE) {
1028 len -= splicedIn;
1029 }
1030 do {
1031 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1032 offset += splicedOut;
1033 splicedIn -= splicedOut;
1034 } while (splicedIn > 0);
1035 if (len == 0) {
1036
1037 promise.trySuccess();
1038 return true;
1039 }
1040 }
1041 return false;
1042 } finally {
1043 safeClosePipe(pipeIn);
1044 safeClosePipe(pipeOut);
1045 }
1046 } catch (Throwable cause) {
1047
1048 promise.tryFailure(cause);
1049 return true;
1050 }
1051 }
1052 }
1053
1054 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1055 EpollSocketWritableByteChannel() {
1056 super(socket);
1057 assert fd == socket;
1058 }
1059
1060 @Override
1061 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1062 return socket.send(buf, pos, limit);
1063 }
1064
1065 @Override
1066 protected ByteBufAllocator alloc() {
1067 return AbstractEpollStreamChannel.this.alloc();
1068 }
1069 }
1070 }