1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelConfig;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPipeline;
28 import io.netty.channel.ChannelPromise;
29 import io.netty.channel.DefaultFileRegion;
30 import io.netty.channel.EventLoop;
31 import io.netty.channel.FileRegion;
32 import io.netty.channel.RecvByteBufAllocator;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.Socket;
36 import io.netty.util.internal.PlatformDependent;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.ThrowableUtil;
39 import io.netty.util.internal.UnstableApi;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.WritableByteChannel;
48 import java.util.Queue;
49 import java.util.concurrent.Executor;
50
51 import static io.netty.channel.unix.FileDescriptor.pipe;
52 import static io.netty.util.internal.ObjectUtil.checkNotNull;
53
54 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
55
56 private static final String EXPECTED_TYPES =
57 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
58 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
60 private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
61 ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
62 AbstractEpollStreamChannel.class, "clearSpliceQueue()");
63 private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
64 new ClosedChannelException(),
65 AbstractEpollStreamChannel.class, "spliceTo(...)");
66 private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
67 ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
68 AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
69
70 private Queue<SpliceInTask> spliceQueue;
71
72
73 private FileDescriptor pipeIn;
74 private FileDescriptor pipeOut;
75
76 private WritableByteChannel byteChannel;
77
78
79
80
81 @Deprecated
82 protected AbstractEpollStreamChannel(Channel parent, int fd) {
83 this(parent, new Socket(fd));
84 }
85
86
87
88
89 @Deprecated
90 protected AbstractEpollStreamChannel(int fd) {
91 this(new Socket(fd));
92 }
93
94
95
96
97 @Deprecated
98 protected AbstractEpollStreamChannel(FileDescriptor fd) {
99 this(new Socket(fd.intValue()));
100 }
101
102
103
104
105 @Deprecated
106 protected AbstractEpollStreamChannel(Socket fd) {
107 this(fd, isSoErrorZero(fd));
108 }
109
110 protected AbstractEpollStreamChannel(Channel parent, Socket fd) {
111 super(parent, fd, Native.EPOLLIN, true);
112
113 flags |= Native.EPOLLRDHUP;
114 }
115
116 AbstractEpollStreamChannel(Channel parent, Socket fd, SocketAddress remote) {
117 super(parent, fd, Native.EPOLLIN, remote);
118
119 flags |= Native.EPOLLRDHUP;
120 }
121
122 protected AbstractEpollStreamChannel(Socket fd, boolean active) {
123 super(null, fd, Native.EPOLLIN, active);
124
125 flags |= Native.EPOLLRDHUP;
126 }
127
128 @Override
129 protected AbstractEpollUnsafe newUnsafe() {
130 return new EpollStreamUnsafe();
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
148 return spliceTo(ch, len, newPromise());
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
166 final ChannelPromise promise) {
167 if (ch.eventLoop() != eventLoop()) {
168 throw new IllegalArgumentException("EventLoops are not the same.");
169 }
170 if (len < 0) {
171 throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
172 }
173 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
174 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
175 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
176 }
177 checkNotNull(promise, "promise");
178 if (!isOpen()) {
179 promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
180 } else {
181 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
182 failSpliceIfClosed(promise);
183 }
184 return promise;
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
202 return spliceTo(ch, offset, len, newPromise());
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
220 final ChannelPromise promise) {
221 if (len < 0) {
222 throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
223 }
224 if (offset < 0) {
225 throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
226 }
227 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
228 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
229 }
230 checkNotNull(promise, "promise");
231 if (!isOpen()) {
232 promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
233 } else {
234 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
235 failSpliceIfClosed(promise);
236 }
237 return promise;
238 }
239
240 private void failSpliceIfClosed(ChannelPromise promise) {
241 if (!isOpen()) {
242
243
244 if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
245 eventLoop().execute(new Runnable() {
246 @Override
247 public void run() {
248
249 clearSpliceQueue();
250 }
251 });
252 }
253 }
254 }
255
256
257
258
259
260 private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
261 int readableBytes = buf.readableBytes();
262 if (readableBytes == 0) {
263 in.remove();
264 return true;
265 }
266
267 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
268 int writtenBytes = doWriteBytes(buf, writeSpinCount);
269 in.removeBytes(writtenBytes);
270 return writtenBytes == readableBytes;
271 } else {
272 ByteBuffer[] nioBuffers = buf.nioBuffers();
273 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
274 }
275 }
276
277 private boolean writeBytesMultiple(
278 ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException {
279
280 long expectedWrittenBytes = array.size();
281 final long initialExpectedWrittenBytes = expectedWrittenBytes;
282
283 int cnt = array.count();
284
285 assert expectedWrittenBytes != 0;
286 assert cnt != 0;
287
288 boolean done = false;
289 int offset = 0;
290 int end = offset + cnt;
291 for (int i = writeSpinCount - 1; i >= 0; i--) {
292 long localWrittenBytes = fd().writevAddresses(array.memoryAddress(offset), cnt);
293 if (localWrittenBytes == 0) {
294 break;
295 }
296 expectedWrittenBytes -= localWrittenBytes;
297
298 if (expectedWrittenBytes == 0) {
299
300 done = true;
301 break;
302 }
303
304 do {
305 long bytes = array.processWritten(offset, localWrittenBytes);
306 if (bytes == -1) {
307
308 break;
309 } else {
310 offset++;
311 cnt--;
312 localWrittenBytes -= bytes;
313 }
314 } while (offset < end && localWrittenBytes > 0);
315 }
316 in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
317 return done;
318 }
319
320 private boolean writeBytesMultiple(
321 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
322 int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException {
323
324 assert expectedWrittenBytes != 0;
325 final long initialExpectedWrittenBytes = expectedWrittenBytes;
326
327 boolean done = false;
328 int offset = 0;
329 int end = offset + nioBufferCnt;
330 for (int i = writeSpinCount - 1; i >= 0; i--) {
331 long localWrittenBytes = fd().writev(nioBuffers, offset, nioBufferCnt);
332 if (localWrittenBytes == 0) {
333 break;
334 }
335 expectedWrittenBytes -= localWrittenBytes;
336
337 if (expectedWrittenBytes == 0) {
338
339 done = true;
340 break;
341 }
342 do {
343 ByteBuffer buffer = nioBuffers[offset];
344 int pos = buffer.position();
345 int bytes = buffer.limit() - pos;
346 if (bytes > localWrittenBytes) {
347 buffer.position(pos + (int) localWrittenBytes);
348
349 break;
350 } else {
351 offset++;
352 nioBufferCnt--;
353 localWrittenBytes -= bytes;
354 }
355 } while (offset < end && localWrittenBytes > 0);
356 }
357
358 in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
359 return done;
360 }
361
362
363
364
365
366
367
368 private boolean writeDefaultFileRegion(
369 ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
370 final long regionCount = region.count();
371 if (region.transfered() >= regionCount) {
372 in.remove();
373 return true;
374 }
375
376 final long baseOffset = region.position();
377 boolean done = false;
378 long flushedAmount = 0;
379
380 for (int i = writeSpinCount - 1; i >= 0; i--) {
381 final long offset = region.transfered();
382 final long localFlushedAmount =
383 Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
384 if (localFlushedAmount == 0) {
385 break;
386 }
387
388 flushedAmount += localFlushedAmount;
389 if (region.transfered() >= regionCount) {
390 done = true;
391 break;
392 }
393 }
394
395 if (flushedAmount > 0) {
396 in.progress(flushedAmount);
397 }
398
399 if (done) {
400 in.remove();
401 }
402 return done;
403 }
404
405 private boolean writeFileRegion(
406 ChannelOutboundBuffer in, FileRegion region, final int writeSpinCount) throws Exception {
407 if (region.transfered() >= region.count()) {
408 in.remove();
409 return true;
410 }
411
412 boolean done = false;
413 long flushedAmount = 0;
414
415 if (byteChannel == null) {
416 byteChannel = new SocketWritableByteChannel();
417 }
418 for (int i = writeSpinCount - 1; i >= 0; i--) {
419 final long localFlushedAmount = region.transferTo(byteChannel, region.transfered());
420 if (localFlushedAmount == 0) {
421 break;
422 }
423
424 flushedAmount += localFlushedAmount;
425 if (region.transfered() >= region.count()) {
426 done = true;
427 break;
428 }
429 }
430
431 if (flushedAmount > 0) {
432 in.progress(flushedAmount);
433 }
434
435 if (done) {
436 in.remove();
437 }
438 return done;
439 }
440
441 @Override
442 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
443 int writeSpinCount = config().getWriteSpinCount();
444 for (;;) {
445 final int msgCount = in.size();
446
447 if (msgCount == 0) {
448
449 clearFlag(Native.EPOLLOUT);
450
451 return;
452 }
453
454
455 if (msgCount > 1 && in.current() instanceof ByteBuf) {
456 if (!doWriteMultiple(in, writeSpinCount)) {
457
458 break;
459 }
460
461
462
463
464 } else {
465 if (!doWriteSingle(in, writeSpinCount)) {
466
467 break;
468 }
469 }
470 }
471
472
473 setFlag(Native.EPOLLOUT);
474 }
475
476 protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
477
478 Object msg = in.current();
479 if (msg instanceof ByteBuf) {
480 if (!writeBytes(in, (ByteBuf) msg, writeSpinCount)) {
481
482
483 return false;
484 }
485 } else if (msg instanceof DefaultFileRegion) {
486 if (!writeDefaultFileRegion(in, (DefaultFileRegion) msg, writeSpinCount)) {
487
488
489 return false;
490 }
491 } else if (msg instanceof FileRegion) {
492 if (!writeFileRegion(in, (FileRegion) msg, writeSpinCount)) {
493
494
495 return false;
496 }
497 } else if (msg instanceof SpliceOutTask) {
498 if (!((SpliceOutTask) msg).spliceOut()) {
499 return false;
500 }
501 in.remove();
502 } else {
503
504 throw new Error();
505 }
506
507 return true;
508 }
509
510 private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
511 if (PlatformDependent.hasUnsafe()) {
512
513 IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
514 in.forEachFlushedMessage(array);
515
516 int cnt = array.count();
517 if (cnt >= 1) {
518
519 if (!writeBytesMultiple(in, array, writeSpinCount)) {
520
521
522 return false;
523 }
524 } else {
525 in.removeBytes(0);
526 }
527 } else {
528 ByteBuffer[] buffers = in.nioBuffers();
529 int cnt = in.nioBufferCount();
530 if (cnt >= 1) {
531
532 if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) {
533
534
535 return false;
536 }
537 } else {
538 in.removeBytes(0);
539 }
540 }
541
542 return true;
543 }
544
545 @Override
546 protected Object filterOutboundMessage(Object msg) {
547 if (msg instanceof ByteBuf) {
548 ByteBuf buf = (ByteBuf) msg;
549 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
550 }
551
552 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
553 return msg;
554 }
555
556 throw new UnsupportedOperationException(
557 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
558 }
559
560 @UnstableApi
561 @Override
562 protected final void doShutdownOutput() throws Exception {
563 fd().shutdown(false, true);
564 }
565
566 @Override
567 public boolean isInputShutdown() {
568 return fd().isInputShutdown();
569 }
570
571 @Override
572 public boolean isOutputShutdown() {
573 return fd().isOutputShutdown();
574 }
575
576 @Override
577 public ChannelFuture shutdownOutput() {
578 return shutdownOutput(newPromise());
579 }
580
581 @Override
582 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
583 EventLoop loop = eventLoop();
584 if (loop.inEventLoop()) {
585 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
586 } else {
587 loop.execute(new Runnable() {
588 @Override
589 public void run() {
590 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
591 }
592 });
593 }
594
595 return promise;
596 }
597
598 @Override
599 protected void doClose() throws Exception {
600 try {
601
602 super.doClose();
603 } finally {
604 safeClosePipe(pipeIn);
605 safeClosePipe(pipeOut);
606 clearSpliceQueue();
607 }
608 }
609
610 private void clearSpliceQueue() {
611 if (spliceQueue == null) {
612 return;
613 }
614 for (;;) {
615 SpliceInTask task = spliceQueue.poll();
616 if (task == null) {
617 break;
618 }
619 task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
620 }
621 }
622
623 private static void safeClosePipe(FileDescriptor fd) {
624 if (fd != null) {
625 try {
626 fd.close();
627 } catch (IOException e) {
628 if (logger.isWarnEnabled()) {
629 logger.warn("Error while closing a pipe", e);
630 }
631 }
632 }
633 }
634
635 class EpollStreamUnsafe extends AbstractEpollUnsafe {
636
637 private RecvByteBufAllocator.Handle allocHandle;
638
639
640 @Override
641 protected Executor prepareToClose() {
642 return super.prepareToClose();
643 }
644
645 private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
646 if (byteBuf != null) {
647 if (byteBuf.isReadable()) {
648 readPending = false;
649 pipeline.fireChannelRead(byteBuf);
650 } else {
651 byteBuf.release();
652 }
653 }
654 pipeline.fireChannelReadComplete();
655 pipeline.fireExceptionCaught(cause);
656 if (close || cause instanceof IOException) {
657 shutdownInput();
658 return true;
659 }
660 return false;
661 }
662
663 @Override
664 void epollInReady() {
665 if (fd().isInputShutdown()) {
666 return;
667 }
668 final ChannelConfig config = config();
669 boolean edgeTriggered = isFlagSet(Native.EPOLLET);
670
671 if (!readPending && !edgeTriggered && !config.isAutoRead()) {
672
673 clearEpollIn0();
674 return;
675 }
676
677 final ChannelPipeline pipeline = pipeline();
678 final ByteBufAllocator allocator = config.getAllocator();
679 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
680 if (allocHandle == null) {
681 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
682 }
683
684 ByteBuf byteBuf = null;
685 boolean close = false;
686 try {
687
688 final int maxMessagesPerRead = edgeTriggered
689 ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
690 int messages = 0;
691 int totalReadAmount = 0;
692 do {
693 if (spliceQueue != null) {
694 SpliceInTask spliceTask = spliceQueue.peek();
695 if (spliceTask != null) {
696 if (spliceTask.spliceIn(allocHandle)) {
697
698
699 if (isActive()) {
700 spliceQueue.remove();
701 }
702 continue;
703 } else {
704 break;
705 }
706 }
707 }
708
709
710
711 byteBuf = allocHandle.allocate(allocator);
712 int writable = byteBuf.writableBytes();
713 int localReadAmount = doReadBytes(byteBuf);
714 if (localReadAmount <= 0) {
715
716 byteBuf.release();
717 close = localReadAmount < 0;
718 if (close) {
719
720 readPending = false;
721 }
722 break;
723 }
724 readPending = false;
725 pipeline.fireChannelRead(byteBuf);
726 byteBuf = null;
727
728 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
729 allocHandle.record(totalReadAmount);
730
731
732 totalReadAmount = localReadAmount;
733 } else {
734 totalReadAmount += localReadAmount;
735 }
736
737 if (localReadAmount < writable) {
738
739
740 break;
741 }
742 if (!edgeTriggered && !config.isAutoRead()) {
743
744
745
746 break;
747 }
748
749 if (fd().isInputShutdown()) {
750
751
752
753
754
755
756
757
758
759
760
761 break;
762 }
763 } while (++ messages < maxMessagesPerRead || isRdHup());
764
765 pipeline.fireChannelReadComplete();
766 allocHandle.record(totalReadAmount);
767
768 if (close) {
769 shutdownInput();
770 close = false;
771 }
772 } catch (Throwable t) {
773 boolean closed = handleReadException(pipeline, byteBuf, t, close);
774 if (!closed) {
775
776
777 eventLoop().execute(new Runnable() {
778 @Override
779 public void run() {
780 epollInReady();
781 }
782 });
783 }
784 } finally {
785
786
787
788
789
790
791 if (!readPending && !config.isAutoRead()) {
792 clearEpollIn0();
793 }
794 }
795 }
796 }
797
798 private void addToSpliceQueue(final SpliceInTask task) {
799 EventLoop eventLoop = eventLoop();
800 if (eventLoop.inEventLoop()) {
801 addToSpliceQueue0(task);
802 } else {
803 eventLoop.execute(new Runnable() {
804 @Override
805 public void run() {
806 addToSpliceQueue0(task);
807 }
808 });
809 }
810 }
811
812 private void addToSpliceQueue0(SpliceInTask task) {
813 if (spliceQueue == null) {
814 spliceQueue = PlatformDependent.newMpscQueue();
815 }
816 spliceQueue.add(task);
817 }
818
819 protected abstract class SpliceInTask {
820 final ChannelPromise promise;
821 int len;
822
823 protected SpliceInTask(int len, ChannelPromise promise) {
824 this.promise = promise;
825 this.len = len;
826 }
827
828 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
829
830 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
831
832 int length = Math.min(handle.guess(), len);
833 int splicedIn = 0;
834 for (;;) {
835
836 int localSplicedIn = Native.splice(fd().intValue(), -1, pipeOut.intValue(), -1, length);
837 if (localSplicedIn == 0) {
838 break;
839 }
840 splicedIn += localSplicedIn;
841 length -= localSplicedIn;
842 }
843
844
845 handle.record(splicedIn);
846 return splicedIn;
847 }
848 }
849
850
851 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
852 private final AbstractEpollStreamChannel ch;
853
854 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
855 super(len, promise);
856 this.ch = ch;
857 }
858
859 @Override
860 public void operationComplete(ChannelFuture future) throws Exception {
861 if (!future.isSuccess()) {
862 promise.setFailure(future.cause());
863 }
864 }
865
866 @Override
867 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
868 assert ch.eventLoop().inEventLoop();
869 if (len == 0) {
870 promise.setSuccess();
871 return true;
872 }
873 try {
874
875
876
877 FileDescriptor pipeOut = ch.pipeOut;
878 if (pipeOut == null) {
879
880 FileDescriptor[] pipe = pipe();
881 ch.pipeIn = pipe[0];
882 pipeOut = ch.pipeOut = pipe[1];
883 }
884
885 int splicedIn = spliceIn(pipeOut, handle);
886 if (splicedIn > 0) {
887
888 if (len != Integer.MAX_VALUE) {
889 len -= splicedIn;
890 }
891
892
893
894 final ChannelPromise splicePromise;
895 if (len == 0) {
896 splicePromise = promise;
897 } else {
898 splicePromise = ch.newPromise().addListener(this);
899 }
900
901 boolean autoRead = config().isAutoRead();
902
903
904
905 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
906 ch.unsafe().flush();
907 if (autoRead && !splicePromise.isDone()) {
908
909
910
911
912 config().setAutoRead(false);
913 }
914 }
915
916 return len == 0;
917 } catch (Throwable cause) {
918 promise.setFailure(cause);
919 return true;
920 }
921 }
922 }
923
924 private final class SpliceOutTask {
925 private final AbstractEpollStreamChannel ch;
926 private final boolean autoRead;
927 private int len;
928
929 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
930 this.ch = ch;
931 this.len = len;
932 this.autoRead = autoRead;
933 }
934
935 public boolean spliceOut() throws Exception {
936 assert ch.eventLoop().inEventLoop();
937 try {
938 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.fd().intValue(), -1, len);
939 len -= splicedOut;
940 if (len == 0) {
941 if (autoRead) {
942
943 config().setAutoRead(true);
944 }
945 return true;
946 }
947 return false;
948 } catch (IOException e) {
949 if (autoRead) {
950
951 config().setAutoRead(true);
952 }
953 throw e;
954 }
955 }
956 }
957
958 private final class SpliceFdTask extends SpliceInTask {
959 private final FileDescriptor fd;
960 private final ChannelPromise promise;
961 private final int offset;
962
963 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
964 super(len, promise);
965 this.fd = fd;
966 this.promise = promise;
967 this.offset = offset;
968 }
969
970 @Override
971 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
972 assert eventLoop().inEventLoop();
973 if (len == 0) {
974 promise.setSuccess();
975 return true;
976 }
977
978 try {
979 FileDescriptor[] pipe = pipe();
980 FileDescriptor pipeIn = pipe[0];
981 FileDescriptor pipeOut = pipe[1];
982 try {
983 int splicedIn = spliceIn(pipeOut, handle);
984 if (splicedIn > 0) {
985
986 if (len != Integer.MAX_VALUE) {
987 len -= splicedIn;
988 }
989 do {
990 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
991 splicedIn -= splicedOut;
992 } while (splicedIn > 0);
993 if (len == 0) {
994 promise.setSuccess();
995 return true;
996 }
997 }
998 return false;
999 } finally {
1000 safeClosePipe(pipeIn);
1001 safeClosePipe(pipeOut);
1002 }
1003 } catch (Throwable cause) {
1004 promise.setFailure(cause);
1005 return true;
1006 }
1007 }
1008 }
1009
1010 private final class SocketWritableByteChannel implements WritableByteChannel {
1011
1012 @Override
1013 public int write(ByteBuffer src) throws IOException {
1014 final int written;
1015 int position = src.position();
1016 int limit = src.limit();
1017 if (src.isDirect()) {
1018 written = fd().write(src, position, src.limit());
1019 } else {
1020 final int readableBytes = limit - position;
1021 ByteBuf buffer = null;
1022 try {
1023 if (readableBytes == 0) {
1024 buffer = Unpooled.EMPTY_BUFFER;
1025 } else {
1026 final ByteBufAllocator alloc = alloc();
1027 if (alloc.isDirectBufferPooled()) {
1028 buffer = alloc.directBuffer(readableBytes);
1029 } else {
1030 buffer = ByteBufUtil.threadLocalDirectBuffer();
1031 if (buffer == null) {
1032 buffer = Unpooled.directBuffer(readableBytes);
1033 }
1034 }
1035 }
1036 buffer.writeBytes(src.duplicate());
1037 ByteBuffer nioBuffer = buffer.internalNioBuffer(buffer.readerIndex(), readableBytes);
1038 written = fd().write(nioBuffer, nioBuffer.position(), nioBuffer.limit());
1039 } finally {
1040 if (buffer != null) {
1041 buffer.release();
1042 }
1043 }
1044 }
1045 if (written > 0) {
1046 src.position(position + written);
1047 }
1048 return written;
1049 }
1050
1051 @Override
1052 public boolean isOpen() {
1053 return fd().isOpen();
1054 }
1055
1056 @Override
1057 public void close() throws IOException {
1058 fd().close();
1059 }
1060 }
1061 }