1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.LeakPresenceDetector;
39 import io.netty.util.internal.PlatformDependent;
40 import io.netty.util.internal.StringUtil;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43
44 import java.io.IOException;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.WritableByteChannel;
49 import java.util.Queue;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54 import static io.netty.channel.unix.FileDescriptor.pipe;
55 import static io.netty.util.internal.ObjectUtil.checkNotNull;
56 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57 import static io.netty.util.internal.StringUtil.className;
58
59 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
60 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
61 private static final String EXPECTED_TYPES =
62 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
63 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
64 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
65
66 private final Runnable flushTask = new Runnable() {
67 @Override
68 public void run() {
69
70
71 ((AbstractEpollUnsafe) unsafe()).flush0();
72 }
73 };
74
75
76 private volatile Queue<SpliceInTask> spliceQueue;
77 private FileDescriptor pipeIn;
78 private FileDescriptor pipeOut;
79
80 private WritableByteChannel byteChannel;
81
82 protected AbstractEpollStreamChannel(Channel parent, int fd) {
83 this(parent, new LinuxSocket(fd));
84 }
85
86 protected AbstractEpollStreamChannel(int fd) {
87 this(new LinuxSocket(fd));
88 }
89
90 AbstractEpollStreamChannel(LinuxSocket fd) {
91 this(fd, isSoErrorZero(fd));
92 }
93
94 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
95
96 super(parent, fd, true, EpollIoOps.EPOLLRDHUP);
97 }
98
99 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100
101 super(parent, fd, remote, EpollIoOps.EPOLLRDHUP);
102 }
103
104 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
105
106 super(null, fd, active, EpollIoOps.EPOLLRDHUP);
107 }
108
109 @Override
110 protected AbstractEpollUnsafe newUnsafe() {
111 return new EpollStreamUnsafe();
112 }
113
114 @Override
115 public ChannelMetadata metadata() {
116 return METADATA;
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 @Deprecated
134 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
135 return spliceTo(ch, len, newPromise());
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 @Deprecated
153 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154 final ChannelPromise promise) {
155 if (ch.eventLoop() != eventLoop()) {
156 throw new IllegalArgumentException("EventLoops are not the same.");
157 }
158 checkPositiveOrZero(len, "len");
159 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162 }
163 checkNotNull(promise, "promise");
164 if (!isOpen()) {
165 promise.tryFailure(new ClosedChannelException());
166 } else {
167 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168 failSpliceIfClosed(promise);
169 }
170 return promise;
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 @Deprecated
189 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
190 return spliceTo(ch, offset, len, newPromise());
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 @Deprecated
209 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
210 final ChannelPromise promise) {
211 checkPositiveOrZero(len, "len");
212 checkPositiveOrZero(offset, "offset");
213 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
214 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
215 }
216 checkNotNull(promise, "promise");
217 if (!isOpen()) {
218 promise.tryFailure(new ClosedChannelException());
219 } else {
220 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
221 failSpliceIfClosed(promise);
222 }
223 return promise;
224 }
225
226 private void failSpliceIfClosed(ChannelPromise promise) {
227 if (!isOpen()) {
228
229
230 if (!promise.isDone()) {
231 final ClosedChannelException ex = new ClosedChannelException();
232 if (promise.tryFailure(ex)) {
233 eventLoop().execute(new Runnable() {
234 @Override
235 public void run() {
236
237 clearSpliceQueue(ex);
238 }
239 });
240 }
241 }
242 }
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
260 int readableBytes = buf.readableBytes();
261 if (readableBytes == 0) {
262 in.remove();
263 return 0;
264 }
265
266 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
267 return doWriteBytes(in, buf);
268 } else {
269 ByteBuffer[] nioBuffers = buf.nioBuffers();
270 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
271 config().getMaxBytesPerGatheringWrite());
272 }
273 }
274
275 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
276
277
278
279 if (attempted == written) {
280 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
281 config().setMaxBytesPerGatheringWrite(attempted << 1);
282 }
283 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
284 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
285 }
286 }
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
304 final long expectedWrittenBytes = array.size();
305 assert expectedWrittenBytes != 0;
306 final int cnt = array.count();
307 assert cnt != 0;
308
309 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
310 if (localWrittenBytes > 0) {
311 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
312 in.removeBytes(localWrittenBytes);
313 return 1;
314 }
315 return WRITE_STATUS_SNDBUF_FULL;
316 }
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 private int writeBytesMultiple(
337 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
338 long maxBytesPerGatheringWrite) throws IOException {
339 assert expectedWrittenBytes != 0;
340 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
341 expectedWrittenBytes = maxBytesPerGatheringWrite;
342 }
343
344 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
345 if (localWrittenBytes > 0) {
346 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
347 in.removeBytes(localWrittenBytes);
348 return 1;
349 }
350 return WRITE_STATUS_SNDBUF_FULL;
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
368 final long offset = region.transferred();
369 final long regionCount = region.count();
370 if (offset >= regionCount) {
371 in.remove();
372 return 0;
373 }
374
375 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
376 if (flushedAmount > 0) {
377 in.progress(flushedAmount);
378 if (region.transferred() >= regionCount) {
379 in.remove();
380 }
381 return 1;
382 } else if (flushedAmount == 0) {
383 validateFileRegion(region, offset);
384 }
385 return WRITE_STATUS_SNDBUF_FULL;
386 }
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
403 if (region.transferred() >= region.count()) {
404 in.remove();
405 return 0;
406 }
407
408 if (byteChannel == null) {
409 byteChannel = new EpollSocketWritableByteChannel();
410 }
411 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
412 if (flushedAmount > 0) {
413 in.progress(flushedAmount);
414 if (region.transferred() >= region.count()) {
415 in.remove();
416 }
417 return 1;
418 }
419 return WRITE_STATUS_SNDBUF_FULL;
420 }
421
422 @Override
423 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
424 int writeSpinCount = config().getWriteSpinCount();
425 do {
426 final int msgCount = in.size();
427
428 if (msgCount > 1 && in.current() instanceof ByteBuf) {
429 writeSpinCount -= doWriteMultiple(in);
430 } else if (msgCount == 0) {
431
432 clearFlag(Native.EPOLLOUT);
433
434 return;
435 } else {
436 writeSpinCount -= doWriteSingle(in);
437 }
438
439
440
441
442 } while (writeSpinCount > 0);
443
444 if (writeSpinCount == 0) {
445
446
447
448
449 clearFlag(Native.EPOLLOUT);
450
451
452 eventLoop().execute(flushTask);
453 } else {
454
455
456 setFlag(Native.EPOLLOUT);
457 }
458 }
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
475
476 Object msg = in.current();
477 if (msg instanceof ByteBuf) {
478 return writeBytes(in, (ByteBuf) msg);
479 } else if (msg instanceof DefaultFileRegion) {
480 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
481 } else if (msg instanceof FileRegion) {
482 return writeFileRegion(in, (FileRegion) msg);
483 } else if (msg instanceof SpliceOutTask) {
484 if (!((SpliceOutTask) msg).spliceOut()) {
485 return WRITE_STATUS_SNDBUF_FULL;
486 }
487 in.remove();
488 return 1;
489 } else {
490
491 throw new Error("Unexpected message type: " + className(msg));
492 }
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
510 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
511 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
512 array.maxBytes(maxBytesPerGatheringWrite);
513 in.forEachFlushedMessage(array);
514
515 if (array.count() >= 1) {
516
517 return writeBytesMultiple(in, array);
518 }
519
520 in.removeBytes(0);
521 return 0;
522 }
523
524 @Override
525 protected Object filterOutboundMessage(Object msg) {
526 if (msg instanceof ByteBuf) {
527 ByteBuf buf = (ByteBuf) msg;
528 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
529 }
530
531 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
532 return msg;
533 }
534
535 throw new UnsupportedOperationException(
536 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
537 }
538
539 @Override
540 protected final void doShutdownOutput() throws Exception {
541 socket.shutdown(false, true);
542 }
543
544 private void shutdownInput0(final ChannelPromise promise) {
545 try {
546 socket.shutdown(true, false);
547 promise.setSuccess();
548 } catch (Throwable cause) {
549 promise.setFailure(cause);
550 }
551 }
552
553 @Override
554 public boolean isOutputShutdown() {
555 return socket.isOutputShutdown();
556 }
557
558 @Override
559 public boolean isInputShutdown() {
560 return socket.isInputShutdown();
561 }
562
563 @Override
564 public boolean isShutdown() {
565 return socket.isShutdown();
566 }
567
568 @Override
569 public ChannelFuture shutdownOutput() {
570 return shutdownOutput(newPromise());
571 }
572
573 @Override
574 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
575 EventLoop loop = eventLoop();
576 if (loop.inEventLoop()) {
577 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
578 } else {
579 loop.execute(new Runnable() {
580 @Override
581 public void run() {
582 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
583 }
584 });
585 }
586
587 return promise;
588 }
589
590 @Override
591 public ChannelFuture shutdownInput() {
592 return shutdownInput(newPromise());
593 }
594
595 @Override
596 public ChannelFuture shutdownInput(final ChannelPromise promise) {
597 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
598 if (closeExecutor != null) {
599 closeExecutor.execute(new Runnable() {
600 @Override
601 public void run() {
602 shutdownInput0(promise);
603 }
604 });
605 } else {
606 EventLoop loop = eventLoop();
607 if (loop.inEventLoop()) {
608 shutdownInput0(promise);
609 } else {
610 loop.execute(new Runnable() {
611 @Override
612 public void run() {
613 shutdownInput0(promise);
614 }
615 });
616 }
617 }
618 return promise;
619 }
620
621 @Override
622 public ChannelFuture shutdown() {
623 return shutdown(newPromise());
624 }
625
626 @Override
627 public ChannelFuture shutdown(final ChannelPromise promise) {
628 ChannelFuture shutdownOutputFuture = shutdownOutput();
629 if (shutdownOutputFuture.isDone()) {
630 shutdownOutputDone(shutdownOutputFuture, promise);
631 } else {
632 shutdownOutputFuture.addListener(new ChannelFutureListener() {
633 @Override
634 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
635 shutdownOutputDone(shutdownOutputFuture, promise);
636 }
637 });
638 }
639 return promise;
640 }
641
642 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
643 ChannelFuture shutdownInputFuture = shutdownInput();
644 if (shutdownInputFuture.isDone()) {
645 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
646 } else {
647 shutdownInputFuture.addListener(new ChannelFutureListener() {
648 @Override
649 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
650 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
651 }
652 });
653 }
654 }
655
656 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
657 ChannelFuture shutdownInputFuture,
658 ChannelPromise promise) {
659 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
660 Throwable shutdownInputCause = shutdownInputFuture.cause();
661 if (shutdownOutputCause != null) {
662 if (shutdownInputCause != null) {
663 logger.debug("Exception suppressed because a previous exception occurred.",
664 shutdownInputCause);
665 }
666 promise.setFailure(shutdownOutputCause);
667 } else if (shutdownInputCause != null) {
668 promise.setFailure(shutdownInputCause);
669 } else {
670 promise.setSuccess();
671 }
672 }
673
674 @Override
675 protected void doClose() throws Exception {
676 try {
677
678 super.doClose();
679 } finally {
680 safeClosePipe(pipeIn);
681 safeClosePipe(pipeOut);
682 clearSpliceQueue(null);
683 }
684 }
685
686 private void clearSpliceQueue(ClosedChannelException exception) {
687 Queue<SpliceInTask> sQueue = spliceQueue;
688 if (sQueue == null) {
689 return;
690 }
691 for (;;) {
692 SpliceInTask task = sQueue.poll();
693 if (task == null) {
694 break;
695 }
696 if (exception == null) {
697 exception = new ClosedChannelException();
698 }
699 task.promise.tryFailure(exception);
700 }
701 }
702
703 private static void safeClosePipe(FileDescriptor fd) {
704 if (fd != null) {
705 try {
706 fd.close();
707 } catch (IOException e) {
708 logger.warn("Error while closing a pipe", e);
709 }
710 }
711 }
712
713 class EpollStreamUnsafe extends AbstractEpollUnsafe {
714
715 @Override
716 protected Executor prepareToClose() {
717 return super.prepareToClose();
718 }
719
720 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause,
721 boolean allDataRead, EpollRecvByteAllocatorHandle allocHandle) {
722 if (byteBuf != null) {
723 if (byteBuf.isReadable()) {
724 readPending = false;
725 pipeline.fireChannelRead(byteBuf);
726 } else {
727 byteBuf.release();
728 }
729 }
730 allocHandle.readComplete();
731 pipeline.fireChannelReadComplete();
732 pipeline.fireExceptionCaught(cause);
733
734
735
736 if (allDataRead ||
737 cause instanceof OutOfMemoryError ||
738 cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
739 cause instanceof IOException) {
740 shutdownInput(true);
741 }
742 }
743
744 @Override
745 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
746 return new EpollRecvByteAllocatorStreamingHandle(handle);
747 }
748
749 @Override
750 void epollInReady() {
751 final ChannelConfig config = config();
752 if (shouldBreakEpollInReady(config)) {
753 clearEpollIn0();
754 return;
755 }
756 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
757 final ChannelPipeline pipeline = pipeline();
758 final ByteBufAllocator allocator = config.getAllocator();
759 allocHandle.reset(config);
760
761 ByteBuf byteBuf = null;
762 boolean allDataRead = false;
763 Queue<SpliceInTask> sQueue = null;
764 try {
765 do {
766 if (sQueue != null || (sQueue = spliceQueue) != null) {
767 SpliceInTask spliceTask = sQueue.peek();
768 if (spliceTask != null) {
769 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
770
771 if (allocHandle.isReceivedRdHup()) {
772 shutdownInput(false);
773 }
774 if (spliceInResult) {
775
776
777 if (isActive()) {
778 sQueue.remove();
779 }
780 continue;
781 } else {
782 break;
783 }
784 }
785 }
786
787
788
789 byteBuf = allocHandle.allocate(allocator);
790 allocHandle.lastBytesRead(doReadBytes(byteBuf));
791 if (allocHandle.lastBytesRead() <= 0) {
792
793 byteBuf.release();
794 byteBuf = null;
795 allDataRead = allocHandle.lastBytesRead() < 0;
796 if (allDataRead) {
797
798 readPending = false;
799 }
800 break;
801 }
802 allocHandle.incMessagesRead(1);
803 readPending = false;
804 pipeline.fireChannelRead(byteBuf);
805 byteBuf = null;
806
807 if (shouldBreakEpollInReady(config)) {
808
809
810
811
812
813
814
815
816
817
818
819 break;
820 }
821 } while (allocHandle.continueReading());
822
823 allocHandle.readComplete();
824 pipeline.fireChannelReadComplete();
825
826 if (allDataRead) {
827 shutdownInput(true);
828 }
829 } catch (Throwable t) {
830 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
831 } finally {
832 if (sQueue == null) {
833 if (shouldStopReading(config)) {
834 clearEpollIn();
835 }
836 } else {
837 if (!config.isAutoRead()) {
838 clearEpollIn();
839 }
840 }
841 }
842 }
843 }
844
845 private void addToSpliceQueue(final SpliceInTask task) {
846 Queue<SpliceInTask> sQueue = spliceQueue;
847 if (sQueue == null) {
848 synchronized (this) {
849 sQueue = spliceQueue;
850 if (sQueue == null) {
851 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
852 }
853 }
854 }
855 sQueue.add(task);
856 }
857
858 protected abstract class SpliceInTask {
859 final ChannelPromise promise;
860 int len;
861
862 protected SpliceInTask(int len, ChannelPromise promise) {
863 this.promise = promise;
864 this.len = len;
865 }
866
867 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
868
869 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
870
871 int length = Math.min(handle.guess(), len);
872 int splicedIn = 0;
873 for (;;) {
874
875 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
876 handle.lastBytesRead(localSplicedIn);
877 if (localSplicedIn == 0) {
878 break;
879 }
880 splicedIn += localSplicedIn;
881 length -= localSplicedIn;
882 }
883
884 return splicedIn;
885 }
886 }
887
888
889 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
890 private final AbstractEpollStreamChannel ch;
891
892 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
893 super(len, promise);
894 this.ch = ch;
895 }
896
897 @Override
898 public void operationComplete(ChannelFuture future) throws Exception {
899 if (!future.isSuccess()) {
900
901 promise.tryFailure(future.cause());
902 }
903 }
904
905 @Override
906 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
907 assert ch.eventLoop().inEventLoop();
908 if (len == 0) {
909
910 promise.trySuccess();
911 return true;
912 }
913 try {
914
915
916
917 FileDescriptor pipeOut = ch.pipeOut;
918 if (pipeOut == null) {
919
920 FileDescriptor[] pipe = pipe();
921 ch.pipeIn = pipe[0];
922 pipeOut = ch.pipeOut = pipe[1];
923 }
924
925 int splicedIn = spliceIn(pipeOut, handle);
926 if (splicedIn > 0) {
927
928 if (len != Integer.MAX_VALUE) {
929 len -= splicedIn;
930 }
931
932
933
934 final ChannelPromise splicePromise;
935 if (len == 0) {
936 splicePromise = promise;
937 } else {
938 splicePromise = ch.newPromise().addListener(this);
939 }
940
941 boolean autoRead = config().isAutoRead();
942
943
944
945 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
946 ch.unsafe().flush();
947 if (autoRead && !splicePromise.isDone()) {
948
949
950
951
952 config().setAutoRead(false);
953 }
954 }
955
956 return len == 0;
957 } catch (Throwable cause) {
958
959 promise.tryFailure(cause);
960 return true;
961 }
962 }
963 }
964
965 private final class SpliceOutTask {
966 private final AbstractEpollStreamChannel ch;
967 private final boolean autoRead;
968 private int len;
969
970 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
971 this.ch = ch;
972 this.len = len;
973 this.autoRead = autoRead;
974 }
975
976 public boolean spliceOut() throws Exception {
977 assert ch.eventLoop().inEventLoop();
978 try {
979 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
980 len -= splicedOut;
981 if (len == 0) {
982 if (autoRead) {
983
984 config().setAutoRead(true);
985 }
986 return true;
987 }
988 return false;
989 } catch (IOException e) {
990 if (autoRead) {
991
992 config().setAutoRead(true);
993 }
994 throw e;
995 }
996 }
997 }
998
999 private final class SpliceFdTask extends SpliceInTask {
1000 private final FileDescriptor fd;
1001 private final ChannelPromise promise;
1002 private int offset;
1003
1004 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1005 super(len, promise);
1006 this.fd = fd;
1007 this.promise = promise;
1008 this.offset = offset;
1009 }
1010
1011 @Override
1012 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1013 assert eventLoop().inEventLoop();
1014 if (len == 0) {
1015
1016 promise.trySuccess();
1017 return true;
1018 }
1019
1020 try {
1021 FileDescriptor[] pipe = pipe();
1022 FileDescriptor pipeIn = pipe[0];
1023 FileDescriptor pipeOut = pipe[1];
1024 try {
1025 int splicedIn = spliceIn(pipeOut, handle);
1026 if (splicedIn > 0) {
1027
1028 if (len != Integer.MAX_VALUE) {
1029 len -= splicedIn;
1030 }
1031 do {
1032 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1033 offset += splicedOut;
1034 splicedIn -= splicedOut;
1035 } while (splicedIn > 0);
1036 if (len == 0) {
1037
1038 promise.trySuccess();
1039 return true;
1040 }
1041 }
1042 return false;
1043 } finally {
1044 safeClosePipe(pipeIn);
1045 safeClosePipe(pipeOut);
1046 }
1047 } catch (Throwable cause) {
1048
1049 promise.tryFailure(cause);
1050 return true;
1051 }
1052 }
1053 }
1054
1055 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1056 EpollSocketWritableByteChannel() {
1057 super(socket);
1058 assert fd == socket;
1059 }
1060
1061 @Override
1062 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1063 return socket.send(buf, pos, limit);
1064 }
1065
1066 @Override
1067 protected ByteBufAllocator alloc() {
1068 return AbstractEpollStreamChannel.this.alloc();
1069 }
1070 }
1071 }