1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.UnstableApi;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43
44 import java.io.IOException;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.WritableByteChannel;
49 import java.util.Queue;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54 import static io.netty.channel.unix.FileDescriptor.pipe;
55 import static io.netty.util.internal.ObjectUtil.checkNotNull;
56 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57
58 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60 private static final String EXPECTED_TYPES =
61 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64
65 private final Runnable flushTask = new Runnable() {
66 @Override
67 public void run() {
68
69
70 ((AbstractEpollUnsafe) unsafe()).flush0();
71 }
72 };
73
74
75 private volatile Queue<SpliceInTask> spliceQueue;
76 private FileDescriptor pipeIn;
77 private FileDescriptor pipeOut;
78
79 private WritableByteChannel byteChannel;
80
81 protected AbstractEpollStreamChannel(Channel parent, int fd) {
82 this(parent, new LinuxSocket(fd));
83 }
84
85 protected AbstractEpollStreamChannel(int fd) {
86 this(new LinuxSocket(fd));
87 }
88
89 AbstractEpollStreamChannel(LinuxSocket fd) {
90 this(fd, isSoErrorZero(fd));
91 }
92
93 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94 super(parent, fd, true);
95
96 flags |= Native.EPOLLRDHUP;
97 }
98
99 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100 super(parent, fd, remote);
101
102 flags |= Native.EPOLLRDHUP;
103 }
104
105 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
106 super(null, fd, active);
107
108 flags |= Native.EPOLLRDHUP;
109 }
110
111 @Override
112 protected AbstractEpollUnsafe newUnsafe() {
113 return new EpollStreamUnsafe();
114 }
115
116 @Override
117 public ChannelMetadata metadata() {
118 return METADATA;
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136 return spliceTo(ch, len, newPromise());
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154 final ChannelPromise promise) {
155 if (ch.eventLoop() != eventLoop()) {
156 throw new IllegalArgumentException("EventLoops are not the same.");
157 }
158 checkPositiveOrZero(len, "len");
159 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162 }
163 checkNotNull(promise, "promise");
164 if (!isOpen()) {
165 promise.tryFailure(new ClosedChannelException());
166 } else {
167 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168 failSpliceIfClosed(promise);
169 }
170 return promise;
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
188 return spliceTo(ch, offset, len, newPromise());
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
206 final ChannelPromise promise) {
207 checkPositiveOrZero(len, "len");
208 checkPositiveOrZero(offset, "offset");
209 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
210 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
211 }
212 checkNotNull(promise, "promise");
213 if (!isOpen()) {
214 promise.tryFailure(new ClosedChannelException());
215 } else {
216 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
217 failSpliceIfClosed(promise);
218 }
219 return promise;
220 }
221
222 private void failSpliceIfClosed(ChannelPromise promise) {
223 if (!isOpen()) {
224
225
226 if (promise.tryFailure(new ClosedChannelException())) {
227 eventLoop().execute(new Runnable() {
228 @Override
229 public void run() {
230
231 clearSpliceQueue();
232 }
233 });
234 }
235 }
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
253 int readableBytes = buf.readableBytes();
254 if (readableBytes == 0) {
255 in.remove();
256 return 0;
257 }
258
259 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
260 return doWriteBytes(in, buf);
261 } else {
262 ByteBuffer[] nioBuffers = buf.nioBuffers();
263 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
264 config().getMaxBytesPerGatheringWrite());
265 }
266 }
267
268 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
269
270
271
272 if (attempted == written) {
273 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
274 config().setMaxBytesPerGatheringWrite(attempted << 1);
275 }
276 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
277 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
297 final long expectedWrittenBytes = array.size();
298 assert expectedWrittenBytes != 0;
299 final int cnt = array.count();
300 assert cnt != 0;
301
302 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
303 if (localWrittenBytes > 0) {
304 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
305 in.removeBytes(localWrittenBytes);
306 return 1;
307 }
308 return WRITE_STATUS_SNDBUF_FULL;
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 private int writeBytesMultiple(
330 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
331 long maxBytesPerGatheringWrite) throws IOException {
332 assert expectedWrittenBytes != 0;
333 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
334 expectedWrittenBytes = maxBytesPerGatheringWrite;
335 }
336
337 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
338 if (localWrittenBytes > 0) {
339 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
340 in.removeBytes(localWrittenBytes);
341 return 1;
342 }
343 return WRITE_STATUS_SNDBUF_FULL;
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
361 final long offset = region.transferred();
362 final long regionCount = region.count();
363 if (offset >= regionCount) {
364 in.remove();
365 return 0;
366 }
367
368 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
369 if (flushedAmount > 0) {
370 in.progress(flushedAmount);
371 if (region.transferred() >= regionCount) {
372 in.remove();
373 }
374 return 1;
375 } else if (flushedAmount == 0) {
376 validateFileRegion(region, offset);
377 }
378 return WRITE_STATUS_SNDBUF_FULL;
379 }
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
396 if (region.transferred() >= region.count()) {
397 in.remove();
398 return 0;
399 }
400
401 if (byteChannel == null) {
402 byteChannel = new EpollSocketWritableByteChannel();
403 }
404 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
405 if (flushedAmount > 0) {
406 in.progress(flushedAmount);
407 if (region.transferred() >= region.count()) {
408 in.remove();
409 }
410 return 1;
411 }
412 return WRITE_STATUS_SNDBUF_FULL;
413 }
414
415 @Override
416 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
417 int writeSpinCount = config().getWriteSpinCount();
418 do {
419 final int msgCount = in.size();
420
421 if (msgCount > 1 && in.current() instanceof ByteBuf) {
422 writeSpinCount -= doWriteMultiple(in);
423 } else if (msgCount == 0) {
424
425 clearFlag(Native.EPOLLOUT);
426
427 return;
428 } else {
429 writeSpinCount -= doWriteSingle(in);
430 }
431
432
433
434
435 } while (writeSpinCount > 0);
436
437 if (writeSpinCount == 0) {
438
439
440
441
442 clearFlag(Native.EPOLLOUT);
443
444
445 eventLoop().execute(flushTask);
446 } else {
447
448
449 setFlag(Native.EPOLLOUT);
450 }
451 }
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
468
469 Object msg = in.current();
470 if (msg instanceof ByteBuf) {
471 return writeBytes(in, (ByteBuf) msg);
472 } else if (msg instanceof DefaultFileRegion) {
473 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
474 } else if (msg instanceof FileRegion) {
475 return writeFileRegion(in, (FileRegion) msg);
476 } else if (msg instanceof SpliceOutTask) {
477 if (!((SpliceOutTask) msg).spliceOut()) {
478 return WRITE_STATUS_SNDBUF_FULL;
479 }
480 in.remove();
481 return 1;
482 } else {
483
484 throw new Error();
485 }
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
503 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
504 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
505 array.maxBytes(maxBytesPerGatheringWrite);
506 in.forEachFlushedMessage(array);
507
508 if (array.count() >= 1) {
509
510 return writeBytesMultiple(in, array);
511 }
512
513 in.removeBytes(0);
514 return 0;
515 }
516
517 @Override
518 protected Object filterOutboundMessage(Object msg) {
519 if (msg instanceof ByteBuf) {
520 ByteBuf buf = (ByteBuf) msg;
521 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
522 }
523
524 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
525 return msg;
526 }
527
528 throw new UnsupportedOperationException(
529 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
530 }
531
532 @UnstableApi
533 @Override
534 protected final void doShutdownOutput() throws Exception {
535 socket.shutdown(false, true);
536 }
537
538 private void shutdownInput0(final ChannelPromise promise) {
539 try {
540 socket.shutdown(true, false);
541 promise.setSuccess();
542 } catch (Throwable cause) {
543 promise.setFailure(cause);
544 }
545 }
546
547 @Override
548 public boolean isOutputShutdown() {
549 return socket.isOutputShutdown();
550 }
551
552 @Override
553 public boolean isInputShutdown() {
554 return socket.isInputShutdown();
555 }
556
557 @Override
558 public boolean isShutdown() {
559 return socket.isShutdown();
560 }
561
562 @Override
563 public ChannelFuture shutdownOutput() {
564 return shutdownOutput(newPromise());
565 }
566
567 @Override
568 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
569 EventLoop loop = eventLoop();
570 if (loop.inEventLoop()) {
571 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
572 } else {
573 loop.execute(new Runnable() {
574 @Override
575 public void run() {
576 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
577 }
578 });
579 }
580
581 return promise;
582 }
583
584 @Override
585 public ChannelFuture shutdownInput() {
586 return shutdownInput(newPromise());
587 }
588
589 @Override
590 public ChannelFuture shutdownInput(final ChannelPromise promise) {
591 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
592 if (closeExecutor != null) {
593 closeExecutor.execute(new Runnable() {
594 @Override
595 public void run() {
596 shutdownInput0(promise);
597 }
598 });
599 } else {
600 EventLoop loop = eventLoop();
601 if (loop.inEventLoop()) {
602 shutdownInput0(promise);
603 } else {
604 loop.execute(new Runnable() {
605 @Override
606 public void run() {
607 shutdownInput0(promise);
608 }
609 });
610 }
611 }
612 return promise;
613 }
614
615 @Override
616 public ChannelFuture shutdown() {
617 return shutdown(newPromise());
618 }
619
620 @Override
621 public ChannelFuture shutdown(final ChannelPromise promise) {
622 ChannelFuture shutdownOutputFuture = shutdownOutput();
623 if (shutdownOutputFuture.isDone()) {
624 shutdownOutputDone(shutdownOutputFuture, promise);
625 } else {
626 shutdownOutputFuture.addListener(new ChannelFutureListener() {
627 @Override
628 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
629 shutdownOutputDone(shutdownOutputFuture, promise);
630 }
631 });
632 }
633 return promise;
634 }
635
636 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
637 ChannelFuture shutdownInputFuture = shutdownInput();
638 if (shutdownInputFuture.isDone()) {
639 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
640 } else {
641 shutdownInputFuture.addListener(new ChannelFutureListener() {
642 @Override
643 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
644 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
645 }
646 });
647 }
648 }
649
650 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
651 ChannelFuture shutdownInputFuture,
652 ChannelPromise promise) {
653 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
654 Throwable shutdownInputCause = shutdownInputFuture.cause();
655 if (shutdownOutputCause != null) {
656 if (shutdownInputCause != null) {
657 logger.debug("Exception suppressed because a previous exception occurred.",
658 shutdownInputCause);
659 }
660 promise.setFailure(shutdownOutputCause);
661 } else if (shutdownInputCause != null) {
662 promise.setFailure(shutdownInputCause);
663 } else {
664 promise.setSuccess();
665 }
666 }
667
668 @Override
669 protected void doClose() throws Exception {
670 try {
671
672 super.doClose();
673 } finally {
674 safeClosePipe(pipeIn);
675 safeClosePipe(pipeOut);
676 clearSpliceQueue();
677 }
678 }
679
680 private void clearSpliceQueue() {
681 Queue<SpliceInTask> sQueue = spliceQueue;
682 if (sQueue == null) {
683 return;
684 }
685 ClosedChannelException exception = null;
686
687 for (;;) {
688 SpliceInTask task = sQueue.poll();
689 if (task == null) {
690 break;
691 }
692 if (exception == null) {
693 exception = new ClosedChannelException();
694 }
695 task.promise.tryFailure(exception);
696 }
697 }
698
699 private static void safeClosePipe(FileDescriptor fd) {
700 if (fd != null) {
701 try {
702 fd.close();
703 } catch (IOException e) {
704 logger.warn("Error while closing a pipe", e);
705 }
706 }
707 }
708
709 class EpollStreamUnsafe extends AbstractEpollUnsafe {
710
711 @Override
712 protected Executor prepareToClose() {
713 return super.prepareToClose();
714 }
715
716 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
717 EpollRecvByteAllocatorHandle allocHandle) {
718 if (byteBuf != null) {
719 if (byteBuf.isReadable()) {
720 readPending = false;
721 pipeline.fireChannelRead(byteBuf);
722 } else {
723 byteBuf.release();
724 }
725 }
726 allocHandle.readComplete();
727 pipeline.fireChannelReadComplete();
728 pipeline.fireExceptionCaught(cause);
729
730
731
732 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
733 shutdownInput(false);
734 }
735 }
736
737 @Override
738 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
739 return new EpollRecvByteAllocatorStreamingHandle(handle);
740 }
741
742 @Override
743 void epollInReady() {
744 final ChannelConfig config = config();
745 if (shouldBreakEpollInReady(config)) {
746 clearEpollIn0();
747 return;
748 }
749 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
750 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
751
752 final ChannelPipeline pipeline = pipeline();
753 final ByteBufAllocator allocator = config.getAllocator();
754 allocHandle.reset(config);
755 epollInBefore();
756
757 ByteBuf byteBuf = null;
758 boolean close = false;
759 Queue<SpliceInTask> sQueue = null;
760 try {
761 do {
762 if (sQueue != null || (sQueue = spliceQueue) != null) {
763 SpliceInTask spliceTask = sQueue.peek();
764 if (spliceTask != null) {
765 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
766
767 if (allocHandle.isReceivedRdHup()) {
768 shutdownInput(true);
769 }
770 if (spliceInResult) {
771
772
773 if (isActive()) {
774 sQueue.remove();
775 }
776 continue;
777 } else {
778 break;
779 }
780 }
781 }
782
783
784
785 byteBuf = allocHandle.allocate(allocator);
786 allocHandle.lastBytesRead(doReadBytes(byteBuf));
787 if (allocHandle.lastBytesRead() <= 0) {
788
789 byteBuf.release();
790 byteBuf = null;
791 close = allocHandle.lastBytesRead() < 0;
792 if (close) {
793
794 readPending = false;
795 }
796 break;
797 }
798 allocHandle.incMessagesRead(1);
799 readPending = false;
800 pipeline.fireChannelRead(byteBuf);
801 byteBuf = null;
802
803 if (shouldBreakEpollInReady(config)) {
804
805
806
807
808
809
810
811
812
813
814
815 break;
816 }
817 } while (allocHandle.continueReading());
818
819 allocHandle.readComplete();
820 pipeline.fireChannelReadComplete();
821
822 if (close) {
823 shutdownInput(false);
824 }
825 } catch (Throwable t) {
826 handleReadException(pipeline, byteBuf, t, close, allocHandle);
827 } finally {
828 if (sQueue == null) {
829 epollInFinally(config);
830 } else {
831 if (!config.isAutoRead()) {
832 clearEpollIn();
833 }
834 }
835 }
836 }
837 }
838
839 private void addToSpliceQueue(final SpliceInTask task) {
840 Queue<SpliceInTask> sQueue = spliceQueue;
841 if (sQueue == null) {
842 synchronized (this) {
843 sQueue = spliceQueue;
844 if (sQueue == null) {
845 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
846 }
847 }
848 }
849 sQueue.add(task);
850 }
851
852 protected abstract class SpliceInTask {
853 final ChannelPromise promise;
854 int len;
855
856 protected SpliceInTask(int len, ChannelPromise promise) {
857 this.promise = promise;
858 this.len = len;
859 }
860
861 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
862
863 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
864
865 int length = Math.min(handle.guess(), len);
866 int splicedIn = 0;
867 for (;;) {
868
869 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
870 handle.lastBytesRead(localSplicedIn);
871 if (localSplicedIn == 0) {
872 break;
873 }
874 splicedIn += localSplicedIn;
875 length -= localSplicedIn;
876 }
877
878 return splicedIn;
879 }
880 }
881
882
883 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
884 private final AbstractEpollStreamChannel ch;
885
886 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
887 super(len, promise);
888 this.ch = ch;
889 }
890
891 @Override
892 public void operationComplete(ChannelFuture future) throws Exception {
893 if (!future.isSuccess()) {
894 promise.setFailure(future.cause());
895 }
896 }
897
898 @Override
899 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
900 assert ch.eventLoop().inEventLoop();
901 if (len == 0) {
902 promise.setSuccess();
903 return true;
904 }
905 try {
906
907
908
909 FileDescriptor pipeOut = ch.pipeOut;
910 if (pipeOut == null) {
911
912 FileDescriptor[] pipe = pipe();
913 ch.pipeIn = pipe[0];
914 pipeOut = ch.pipeOut = pipe[1];
915 }
916
917 int splicedIn = spliceIn(pipeOut, handle);
918 if (splicedIn > 0) {
919
920 if (len != Integer.MAX_VALUE) {
921 len -= splicedIn;
922 }
923
924
925
926 final ChannelPromise splicePromise;
927 if (len == 0) {
928 splicePromise = promise;
929 } else {
930 splicePromise = ch.newPromise().addListener(this);
931 }
932
933 boolean autoRead = config().isAutoRead();
934
935
936
937 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
938 ch.unsafe().flush();
939 if (autoRead && !splicePromise.isDone()) {
940
941
942
943
944 config().setAutoRead(false);
945 }
946 }
947
948 return len == 0;
949 } catch (Throwable cause) {
950 promise.setFailure(cause);
951 return true;
952 }
953 }
954 }
955
956 private final class SpliceOutTask {
957 private final AbstractEpollStreamChannel ch;
958 private final boolean autoRead;
959 private int len;
960
961 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
962 this.ch = ch;
963 this.len = len;
964 this.autoRead = autoRead;
965 }
966
967 public boolean spliceOut() throws Exception {
968 assert ch.eventLoop().inEventLoop();
969 try {
970 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
971 len -= splicedOut;
972 if (len == 0) {
973 if (autoRead) {
974
975 config().setAutoRead(true);
976 }
977 return true;
978 }
979 return false;
980 } catch (IOException e) {
981 if (autoRead) {
982
983 config().setAutoRead(true);
984 }
985 throw e;
986 }
987 }
988 }
989
990 private final class SpliceFdTask extends SpliceInTask {
991 private final FileDescriptor fd;
992 private final ChannelPromise promise;
993 private int offset;
994
995 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
996 super(len, promise);
997 this.fd = fd;
998 this.promise = promise;
999 this.offset = offset;
1000 }
1001
1002 @Override
1003 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1004 assert eventLoop().inEventLoop();
1005 if (len == 0) {
1006 promise.setSuccess();
1007 return true;
1008 }
1009
1010 try {
1011 FileDescriptor[] pipe = pipe();
1012 FileDescriptor pipeIn = pipe[0];
1013 FileDescriptor pipeOut = pipe[1];
1014 try {
1015 int splicedIn = spliceIn(pipeOut, handle);
1016 if (splicedIn > 0) {
1017
1018 if (len != Integer.MAX_VALUE) {
1019 len -= splicedIn;
1020 }
1021 do {
1022 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1023 offset += splicedOut;
1024 splicedIn -= splicedOut;
1025 } while (splicedIn > 0);
1026 if (len == 0) {
1027 promise.setSuccess();
1028 return true;
1029 }
1030 }
1031 return false;
1032 } finally {
1033 safeClosePipe(pipeIn);
1034 safeClosePipe(pipeOut);
1035 }
1036 } catch (Throwable cause) {
1037 promise.setFailure(cause);
1038 return true;
1039 }
1040 }
1041 }
1042
1043 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1044 EpollSocketWritableByteChannel() {
1045 super(socket);
1046 }
1047
1048 @Override
1049 protected ByteBufAllocator alloc() {
1050 return AbstractEpollStreamChannel.this.alloc();
1051 }
1052 }
1053 }