View Javadoc

1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelConfig;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPipeline;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.channel.DefaultFileRegion;
30  import io.netty.channel.EventLoop;
31  import io.netty.channel.FileRegion;
32  import io.netty.channel.RecvByteBufAllocator;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.Socket;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.ThrowableUtil;
39  import io.netty.util.internal.UnstableApi;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.ClosedChannelException;
47  import java.nio.channels.WritableByteChannel;
48  import java.util.Queue;
49  import java.util.concurrent.Executor;
50  
51  import static io.netty.channel.unix.FileDescriptor.pipe;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  
54  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
55  
56      private static final String EXPECTED_TYPES =
57              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
58                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
60      private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
61              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
62                      AbstractEpollStreamChannel.class, "clearSpliceQueue()");
63      private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
64              new ClosedChannelException(),
65              AbstractEpollStreamChannel.class, "spliceTo(...)");
66      private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
67              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
68              AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
69  
70      private Queue<SpliceInTask> spliceQueue;
71  
72      // Lazy init these if we need to splice(...)
73      private FileDescriptor pipeIn;
74      private FileDescriptor pipeOut;
75  
76      private WritableByteChannel byteChannel;
77  
78      /**
79       * @deprecated Use {@link #AbstractEpollStreamChannel(Channel, Socket)}.
80       */
81      @Deprecated
82      protected AbstractEpollStreamChannel(Channel parent, int fd) {
83          this(parent, new Socket(fd));
84      }
85  
86      /**
87       * @deprecated Use {@link #AbstractEpollStreamChannel(Socket, boolean)}.
88       */
89      @Deprecated
90      protected AbstractEpollStreamChannel(int fd) {
91          this(new Socket(fd));
92      }
93  
94      /**
95       * @deprecated Use {@link #AbstractEpollStreamChannel(Socket, boolean)}.
96       */
97      @Deprecated
98      protected AbstractEpollStreamChannel(FileDescriptor fd) {
99          this(new Socket(fd.intValue()));
100     }
101 
102     /**
103      * @deprecated Use {@link #AbstractEpollStreamChannel(Socket, boolean)}.
104      */
105     @Deprecated
106     protected AbstractEpollStreamChannel(Socket fd) {
107         this(fd, isSoErrorZero(fd));
108     }
109 
110     protected AbstractEpollStreamChannel(Channel parent, Socket fd) {
111         super(parent, fd, Native.EPOLLIN, true);
112         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
113         flags |= Native.EPOLLRDHUP;
114     }
115 
116     AbstractEpollStreamChannel(Channel parent, Socket fd, SocketAddress remote) {
117         super(parent, fd, Native.EPOLLIN, remote);
118         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
119         flags |= Native.EPOLLRDHUP;
120     }
121 
122     protected AbstractEpollStreamChannel(Socket fd, boolean active) {
123         super(null, fd, Native.EPOLLIN, active);
124         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
125         flags |= Native.EPOLLRDHUP;
126     }
127 
128     @Override
129     protected AbstractEpollUnsafe newUnsafe() {
130         return new EpollStreamUnsafe();
131     }
132 
133     /**
134      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
135      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
136      * splice until the {@link ChannelFuture} was canceled or it was failed.
137      *
138      * Please note:
139      * <ul>
140      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
141      *   {@link IllegalArgumentException} is thrown. </li>
142      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
143      *   target {@link AbstractEpollStreamChannel}</li>
144      * </ul>
145      *
146      */
147     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
148         return spliceTo(ch, len, newPromise());
149     }
150 
151     /**
152      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
153      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
154      * splice until the {@link ChannelFuture} was canceled or it was failed.
155      *
156      * Please note:
157      * <ul>
158      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
159      *   {@link IllegalArgumentException} is thrown. </li>
160      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
161      *   target {@link AbstractEpollStreamChannel}</li>
162      * </ul>
163      *
164      */
165     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
166                                         final ChannelPromise promise) {
167         if (ch.eventLoop() != eventLoop()) {
168             throw new IllegalArgumentException("EventLoops are not the same.");
169         }
170         if (len < 0) {
171             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
172         }
173         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
174                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
175             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
176         }
177         checkNotNull(promise, "promise");
178         if (!isOpen()) {
179             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
180         } else {
181             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
182             failSpliceIfClosed(promise);
183         }
184         return promise;
185     }
186 
187     /**
188      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
189      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
190      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
191      * {@link ChannelFuture} was canceled or it was failed.
192      *
193      * Please note:
194      * <ul>
195      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
196      *   {@link AbstractEpollStreamChannel}</li>
197      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
198      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
199      * </ul>
200      */
201     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
202         return spliceTo(ch, offset, len, newPromise());
203     }
204 
205     /**
206      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
207      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
208      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
209      * {@link ChannelFuture} was canceled or it was failed.
210      *
211      * Please note:
212      * <ul>
213      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
214      *   {@link AbstractEpollStreamChannel}</li>
215      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
216      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
217      * </ul>
218      */
219     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
220                                         final ChannelPromise promise) {
221         if (len < 0) {
222             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
223         }
224         if (offset < 0) {
225             throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
226         }
227         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
228             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
229         }
230         checkNotNull(promise, "promise");
231         if (!isOpen()) {
232             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
233         } else {
234             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
235             failSpliceIfClosed(promise);
236         }
237         return promise;
238     }
239 
240     private void failSpliceIfClosed(ChannelPromise promise) {
241         if (!isOpen()) {
242             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
243             // cases where a future may not be notified otherwise.
244             if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
245                 eventLoop().execute(new Runnable() {
246                     @Override
247                     public void run() {
248                         // Call this via the EventLoop as it is a MPSC queue.
249                         clearSpliceQueue();
250                     }
251                 });
252             }
253         }
254     }
255 
256     /**
257      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
258      * @param buf           the {@link ByteBuf} from which the bytes should be written
259      */
260     private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
261         int readableBytes = buf.readableBytes();
262         if (readableBytes == 0) {
263             in.remove();
264             return true;
265         }
266 
267         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
268             int writtenBytes = doWriteBytes(buf, writeSpinCount);
269             in.removeBytes(writtenBytes);
270             return writtenBytes == readableBytes;
271         } else {
272             ByteBuffer[] nioBuffers = buf.nioBuffers();
273             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
274         }
275     }
276 
277     private boolean writeBytesMultiple(
278             ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException {
279 
280         long expectedWrittenBytes = array.size();
281         final long initialExpectedWrittenBytes = expectedWrittenBytes;
282 
283         int cnt = array.count();
284 
285         assert expectedWrittenBytes != 0;
286         assert cnt != 0;
287 
288         boolean done = false;
289         int offset = 0;
290         int end = offset + cnt;
291         for (int i = writeSpinCount - 1; i >= 0; i--) {
292             long localWrittenBytes = fd().writevAddresses(array.memoryAddress(offset), cnt);
293             if (localWrittenBytes == 0) {
294                 break;
295             }
296             expectedWrittenBytes -= localWrittenBytes;
297 
298             if (expectedWrittenBytes == 0) {
299                 // Written everything, just break out here (fast-path)
300                 done = true;
301                 break;
302             }
303 
304             do {
305                 long bytes = array.processWritten(offset, localWrittenBytes);
306                 if (bytes == -1) {
307                     // incomplete write
308                     break;
309                 } else {
310                     offset++;
311                     cnt--;
312                     localWrittenBytes -= bytes;
313                 }
314             } while (offset < end && localWrittenBytes > 0);
315         }
316         in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
317         return done;
318     }
319 
320     private boolean writeBytesMultiple(
321             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
322             int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException {
323 
324         assert expectedWrittenBytes != 0;
325         final long initialExpectedWrittenBytes = expectedWrittenBytes;
326 
327         boolean done = false;
328         int offset = 0;
329         int end = offset + nioBufferCnt;
330         for (int i = writeSpinCount - 1; i >= 0; i--) {
331             long localWrittenBytes = fd().writev(nioBuffers, offset, nioBufferCnt);
332             if (localWrittenBytes == 0) {
333                 break;
334             }
335             expectedWrittenBytes -= localWrittenBytes;
336 
337             if (expectedWrittenBytes == 0) {
338                 // Written everything, just break out here (fast-path)
339                 done = true;
340                 break;
341             }
342             do {
343                 ByteBuffer buffer = nioBuffers[offset];
344                 int pos = buffer.position();
345                 int bytes = buffer.limit() - pos;
346                 if (bytes > localWrittenBytes) {
347                     buffer.position(pos + (int) localWrittenBytes);
348                     // incomplete write
349                     break;
350                 } else {
351                     offset++;
352                     nioBufferCnt--;
353                     localWrittenBytes -= bytes;
354                 }
355             } while (offset < end && localWrittenBytes > 0);
356         }
357 
358         in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
359         return done;
360     }
361 
362     /**
363      * Write a {@link DefaultFileRegion}
364      *
365      * @param region        the {@link DefaultFileRegion} from which the bytes should be written
366      * @return amount       the amount of written bytes
367      */
368     private boolean writeDefaultFileRegion(
369             ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
370         final long regionCount = region.count();
371         if (region.transfered() >= regionCount) {
372             in.remove();
373             return true;
374         }
375 
376         final long baseOffset = region.position();
377         boolean done = false;
378         long flushedAmount = 0;
379 
380         for (int i = writeSpinCount - 1; i >= 0; i--) {
381             final long offset = region.transfered();
382             final long localFlushedAmount =
383                     Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
384             if (localFlushedAmount == 0) {
385                 break;
386             }
387 
388             flushedAmount += localFlushedAmount;
389             if (region.transfered() >= regionCount) {
390                 done = true;
391                 break;
392             }
393         }
394 
395         if (flushedAmount > 0) {
396             in.progress(flushedAmount);
397         }
398 
399         if (done) {
400             in.remove();
401         }
402         return done;
403     }
404 
405     private boolean writeFileRegion(
406             ChannelOutboundBuffer in, FileRegion region, final int writeSpinCount) throws Exception {
407         if (region.transfered() >= region.count()) {
408             in.remove();
409             return true;
410         }
411 
412         boolean done = false;
413         long flushedAmount = 0;
414 
415         if (byteChannel == null) {
416             byteChannel = new SocketWritableByteChannel();
417         }
418         for (int i = writeSpinCount - 1; i >= 0; i--) {
419             final long localFlushedAmount = region.transferTo(byteChannel, region.transfered());
420             if (localFlushedAmount == 0) {
421                 break;
422             }
423 
424             flushedAmount += localFlushedAmount;
425             if (region.transfered() >= region.count()) {
426                 done = true;
427                 break;
428             }
429         }
430 
431         if (flushedAmount > 0) {
432             in.progress(flushedAmount);
433         }
434 
435         if (done) {
436             in.remove();
437         }
438         return done;
439     }
440 
441     @Override
442     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
443         int writeSpinCount = config().getWriteSpinCount();
444         for (;;) {
445             final int msgCount = in.size();
446 
447             if (msgCount == 0) {
448                 // Wrote all messages.
449                 clearFlag(Native.EPOLLOUT);
450                 // Return here so we not set the EPOLLOUT flag.
451                 return;
452             }
453 
454             // Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
455             if (msgCount > 1 && in.current() instanceof ByteBuf) {
456                 if (!doWriteMultiple(in, writeSpinCount)) {
457                     // Break the loop and so set EPOLLOUT flag.
458                     break;
459                 }
460 
461                 // We do not break the loop here even if the outbound buffer was flushed completely,
462                 // because a user might have triggered another write and flush when we notify his or her
463                 // listeners.
464             } else { // msgCount == 1
465                 if (!doWriteSingle(in, writeSpinCount)) {
466                     // Break the loop and so set EPOLLOUT flag.
467                     break;
468                 }
469             }
470         }
471         // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
472         // when it can accept more data.
473         setFlag(Native.EPOLLOUT);
474     }
475 
476     protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
477         // The outbound buffer contains only one message or it contains a file region.
478         Object msg = in.current();
479         if (msg instanceof ByteBuf) {
480             if (!writeBytes(in, (ByteBuf) msg, writeSpinCount)) {
481                 // was not able to write everything so break here we will get notified later again once
482                 // the network stack can handle more writes.
483                 return false;
484             }
485         } else if (msg instanceof DefaultFileRegion) {
486             if (!writeDefaultFileRegion(in, (DefaultFileRegion) msg, writeSpinCount)) {
487                 // was not able to write everything so break here we will get notified later again once
488                 // the network stack can handle more writes.
489                 return false;
490             }
491         } else if (msg instanceof FileRegion) {
492             if (!writeFileRegion(in, (FileRegion) msg, writeSpinCount)) {
493                 // was not able to write everything so break here we will get notified later again once
494                 // the network stack can handle more writes.
495                 return false;
496             }
497         } else if (msg instanceof SpliceOutTask) {
498             if (!((SpliceOutTask) msg).spliceOut()) {
499                 return false;
500             }
501             in.remove();
502         } else {
503             // Should never reach here.
504             throw new Error();
505         }
506 
507         return true;
508     }
509 
510     private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
511         if (PlatformDependent.hasUnsafe()) {
512             // this means we can cast to IovArray and write the IovArray directly.
513             IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
514             in.forEachFlushedMessage(array);
515 
516             int cnt = array.count();
517             if (cnt >= 1) {
518                 // TODO: Handle the case where cnt == 1 specially.
519                 if (!writeBytesMultiple(in, array, writeSpinCount)) {
520                     // was not able to write everything so break here we will get notified later again once
521                     // the network stack can handle more writes.
522                     return false;
523                 }
524             } else { // cnt == 0, which means the outbound buffer contained empty buffers only.
525                 in.removeBytes(0);
526             }
527         } else {
528             ByteBuffer[] buffers = in.nioBuffers();
529             int cnt = in.nioBufferCount();
530             if (cnt >= 1) {
531                 // TODO: Handle the case where cnt == 1 specially.
532                 if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) {
533                     // was not able to write everything so break here we will get notified later again once
534                     // the network stack can handle more writes.
535                     return false;
536                 }
537             } else { // cnt == 0, which means the outbound buffer contained empty buffers only.
538                 in.removeBytes(0);
539             }
540         }
541 
542         return true;
543     }
544 
545     @Override
546     protected Object filterOutboundMessage(Object msg) {
547         if (msg instanceof ByteBuf) {
548             ByteBuf buf = (ByteBuf) msg;
549             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
550         }
551 
552         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
553             return msg;
554         }
555 
556         throw new UnsupportedOperationException(
557                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
558     }
559 
560     @UnstableApi
561     @Override
562     protected final void doShutdownOutput() throws Exception {
563         fd().shutdown(false, true);
564     }
565 
566     @Override
567     public boolean isInputShutdown() {
568         return fd().isInputShutdown();
569     }
570 
571     @Override
572     public boolean isOutputShutdown() {
573         return fd().isOutputShutdown();
574     }
575 
576     @Override
577     public ChannelFuture shutdownOutput() {
578         return shutdownOutput(newPromise());
579     }
580 
581     @Override
582     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
583         EventLoop loop = eventLoop();
584         if (loop.inEventLoop()) {
585             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
586         } else {
587             loop.execute(new Runnable() {
588                 @Override
589                 public void run() {
590                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
591                 }
592             });
593         }
594 
595         return promise;
596     }
597 
598     @Override
599     protected void doClose() throws Exception {
600         try {
601             // Calling super.doClose() first so spliceTo(...) will fail on next call.
602             super.doClose();
603         } finally {
604             safeClosePipe(pipeIn);
605             safeClosePipe(pipeOut);
606             clearSpliceQueue();
607         }
608     }
609 
610     private void clearSpliceQueue() {
611         if (spliceQueue == null) {
612             return;
613         }
614         for (;;) {
615             SpliceInTask task = spliceQueue.poll();
616             if (task == null) {
617                 break;
618             }
619             task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
620         }
621     }
622 
623     private static void safeClosePipe(FileDescriptor fd) {
624         if (fd != null) {
625             try {
626                 fd.close();
627             } catch (IOException e) {
628                 if (logger.isWarnEnabled()) {
629                     logger.warn("Error while closing a pipe", e);
630                 }
631             }
632         }
633     }
634 
635     class EpollStreamUnsafe extends AbstractEpollUnsafe {
636 
637         private RecvByteBufAllocator.Handle allocHandle;
638 
639         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
640         @Override
641         protected Executor prepareToClose() {
642             return super.prepareToClose();
643         }
644 
645         private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
646             if (byteBuf != null) {
647                 if (byteBuf.isReadable()) {
648                     readPending = false;
649                     pipeline.fireChannelRead(byteBuf);
650                 } else {
651                     byteBuf.release();
652                 }
653             }
654             pipeline.fireChannelReadComplete();
655             pipeline.fireExceptionCaught(cause);
656             if (close || cause instanceof IOException) {
657                 shutdownInput();
658                 return true;
659             }
660             return false;
661         }
662 
663         @Override
664         void epollInReady() {
665             if (fd().isInputShutdown()) {
666                 return;
667             }
668             final ChannelConfig config = config();
669             boolean edgeTriggered = isFlagSet(Native.EPOLLET);
670 
671             if (!readPending && !edgeTriggered && !config.isAutoRead()) {
672                 // ChannelConfig.setAutoRead(false) was called in the meantime
673                 clearEpollIn0();
674                 return;
675             }
676 
677             final ChannelPipeline pipeline = pipeline();
678             final ByteBufAllocator allocator = config.getAllocator();
679             RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
680             if (allocHandle == null) {
681                 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
682             }
683 
684             ByteBuf byteBuf = null;
685             boolean close = false;
686             try {
687                 // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
688                 final int maxMessagesPerRead = edgeTriggered
689                         ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
690                 int messages = 0;
691                 int totalReadAmount = 0;
692                 do {
693                     if (spliceQueue != null) {
694                         SpliceInTask spliceTask = spliceQueue.peek();
695                         if (spliceTask != null) {
696                             if (spliceTask.spliceIn(allocHandle)) {
697                                 // We need to check if it is still active as if not we removed all SpliceTasks in
698                                 // doClose(...)
699                                 if (isActive()) {
700                                     spliceQueue.remove();
701                                 }
702                                 continue;
703                             } else {
704                                 break;
705                             }
706                         }
707                     }
708 
709                     // we use a direct buffer here as the native implementations only be able
710                     // to handle direct buffers.
711                     byteBuf = allocHandle.allocate(allocator);
712                     int writable = byteBuf.writableBytes();
713                     int localReadAmount = doReadBytes(byteBuf);
714                     if (localReadAmount <= 0) {
715                         // not was read release the buffer
716                         byteBuf.release();
717                         close = localReadAmount < 0;
718                         if (close) {
719                             // There is nothing left to read as we received an EOF.
720                             readPending = false;
721                         }
722                         break;
723                     }
724                     readPending = false;
725                     pipeline.fireChannelRead(byteBuf);
726                     byteBuf = null;
727 
728                     if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
729                         allocHandle.record(totalReadAmount);
730 
731                         // Avoid overflow.
732                         totalReadAmount = localReadAmount;
733                     } else {
734                         totalReadAmount += localReadAmount;
735                     }
736 
737                     if (localReadAmount < writable) {
738                         // Read less than what the buffer can hold,
739                         // which might mean we drained the recv buffer completely.
740                         break;
741                     }
742                     if (!edgeTriggered && !config.isAutoRead()) {
743                         // This is not using EPOLLET so we can stop reading
744                         // ASAP as we will get notified again later with
745                         // pending data
746                         break;
747                     }
748 
749                     if (fd().isInputShutdown()) {
750                         // We need to do this for two reasons:
751                         //
752                         // - If the input was shutdown in between (which may be the case when the user did it in the
753                         //   fireChannelRead(...) method we should not try to read again to not produce any
754                         //   miss-leading exceptions.
755                         //
756                         // - If the user closes the channel we need to ensure we not try to read from it again as
757                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
758                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
759                         //   reading data from a filedescriptor that belongs to another socket then the socket that
760                         //   was "wrapped" by this Channel implementation.
761                         break;
762                     }
763                 } while (++ messages < maxMessagesPerRead || isRdHup());
764 
765                 pipeline.fireChannelReadComplete();
766                 allocHandle.record(totalReadAmount);
767 
768                 if (close) {
769                     shutdownInput();
770                     close = false;
771                 }
772             } catch (Throwable t) {
773                 boolean closed = handleReadException(pipeline, byteBuf, t, close);
774                 if (!closed) {
775                     // trigger a read again as there may be something left to read and because of epoll ET we
776                     // will not get notified again until we read everything from the socket
777                     eventLoop().execute(new Runnable() {
778                         @Override
779                         public void run() {
780                             epollInReady();
781                         }
782                     });
783                 }
784             } finally {
785                 // Check if there is a readPending which was not processed yet.
786                 // This could be for two reasons:
787                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
788                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
789                 //
790                 // See https://github.com/netty/netty/issues/2254
791                 if (!readPending && !config.isAutoRead()) {
792                     clearEpollIn0();
793                 }
794             }
795         }
796     }
797 
798     private void addToSpliceQueue(final SpliceInTask task) {
799         EventLoop eventLoop = eventLoop();
800         if (eventLoop.inEventLoop()) {
801             addToSpliceQueue0(task);
802         } else {
803             eventLoop.execute(new Runnable() {
804                 @Override
805                 public void run() {
806                     addToSpliceQueue0(task);
807                 }
808             });
809         }
810     }
811 
812     private void addToSpliceQueue0(SpliceInTask task) {
813         if (spliceQueue == null) {
814             spliceQueue = PlatformDependent.newMpscQueue();
815         }
816         spliceQueue.add(task);
817     }
818 
819     protected abstract class SpliceInTask {
820         final ChannelPromise promise;
821         int len;
822 
823         protected SpliceInTask(int len, ChannelPromise promise) {
824             this.promise = promise;
825             this.len = len;
826         }
827 
828         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
829 
830         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
831             // calculate the maximum amount of data we are allowed to splice
832             int length = Math.min(handle.guess(), len);
833             int splicedIn = 0;
834             for (;;) {
835                 // Splicing until there is nothing left to splice.
836                 int localSplicedIn = Native.splice(fd().intValue(), -1, pipeOut.intValue(), -1, length);
837                 if (localSplicedIn == 0) {
838                     break;
839                 }
840                 splicedIn += localSplicedIn;
841                 length -= localSplicedIn;
842             }
843 
844             // record the number of bytes we spliced before
845             handle.record(splicedIn);
846             return splicedIn;
847         }
848     }
849 
850     // Let it directly implement channelFutureListener as well to reduce object creation.
851     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
852         private final AbstractEpollStreamChannel ch;
853 
854         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
855             super(len, promise);
856             this.ch = ch;
857         }
858 
859         @Override
860         public void operationComplete(ChannelFuture future) throws Exception {
861             if (!future.isSuccess()) {
862                 promise.setFailure(future.cause());
863             }
864         }
865 
866         @Override
867         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
868             assert ch.eventLoop().inEventLoop();
869             if (len == 0) {
870                 promise.setSuccess();
871                 return true;
872             }
873             try {
874                 // We create the pipe on the target channel as this will allow us to just handle pending writes
875                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
876                 // on multiple Channels pointing to one target Channel.
877                 FileDescriptor pipeOut = ch.pipeOut;
878                 if (pipeOut == null) {
879                     // Create a new pipe as non was created before.
880                     FileDescriptor[] pipe = pipe();
881                     ch.pipeIn = pipe[0];
882                     pipeOut = ch.pipeOut = pipe[1];
883                 }
884 
885                 int splicedIn = spliceIn(pipeOut, handle);
886                 if (splicedIn > 0) {
887                     // Integer.MAX_VALUE is a special value which will result in splice forever.
888                     if (len != Integer.MAX_VALUE) {
889                         len -= splicedIn;
890                     }
891 
892                     // Depending on if we are done with splicing inbound data we set the right promise for the
893                     // outbound splicing.
894                     final ChannelPromise splicePromise;
895                     if (len == 0) {
896                         splicePromise = promise;
897                     } else {
898                         splicePromise = ch.newPromise().addListener(this);
899                     }
900 
901                     boolean autoRead = config().isAutoRead();
902 
903                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
904                     // case.
905                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
906                     ch.unsafe().flush();
907                     if (autoRead && !splicePromise.isDone()) {
908                         // Write was not done which means the target channel was not writable. In this case we need to
909                         // disable reading until we are done with splicing to the target channel because:
910                         //
911                         // - The user may want to to trigger another splice operation once the splicing was complete.
912                         config().setAutoRead(false);
913                     }
914                 }
915 
916                 return len == 0;
917             } catch (Throwable cause) {
918                 promise.setFailure(cause);
919                 return true;
920             }
921         }
922     }
923 
924     private final class SpliceOutTask {
925         private final AbstractEpollStreamChannel ch;
926         private final boolean autoRead;
927         private int len;
928 
929         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
930             this.ch = ch;
931             this.len = len;
932             this.autoRead = autoRead;
933         }
934 
935         public boolean spliceOut() throws Exception {
936             assert ch.eventLoop().inEventLoop();
937             try {
938                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.fd().intValue(), -1, len);
939                 len -= splicedOut;
940                 if (len == 0) {
941                     if (autoRead) {
942                         // AutoRead was used and we spliced everything so start reading again
943                         config().setAutoRead(true);
944                     }
945                     return true;
946                 }
947                 return false;
948             } catch (IOException e) {
949                 if (autoRead) {
950                     // AutoRead was used and we spliced everything so start reading again
951                     config().setAutoRead(true);
952                 }
953                 throw e;
954             }
955         }
956     }
957 
958     private final class SpliceFdTask extends SpliceInTask {
959         private final FileDescriptor fd;
960         private final ChannelPromise promise;
961         private final int offset;
962 
963         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
964             super(len, promise);
965             this.fd = fd;
966             this.promise = promise;
967             this.offset = offset;
968         }
969 
970         @Override
971         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
972             assert eventLoop().inEventLoop();
973             if (len == 0) {
974                 promise.setSuccess();
975                 return true;
976             }
977 
978             try {
979                 FileDescriptor[] pipe = pipe();
980                 FileDescriptor pipeIn = pipe[0];
981                 FileDescriptor pipeOut = pipe[1];
982                 try {
983                     int splicedIn = spliceIn(pipeOut, handle);
984                     if (splicedIn > 0) {
985                         // Integer.MAX_VALUE is a special value which will result in splice forever.
986                         if (len != Integer.MAX_VALUE) {
987                             len -= splicedIn;
988                         }
989                         do {
990                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
991                             splicedIn -= splicedOut;
992                         } while (splicedIn > 0);
993                         if (len == 0) {
994                             promise.setSuccess();
995                             return true;
996                         }
997                     }
998                     return false;
999                 } finally {
1000                     safeClosePipe(pipeIn);
1001                     safeClosePipe(pipeOut);
1002                 }
1003             } catch (Throwable cause) {
1004                 promise.setFailure(cause);
1005                 return true;
1006             }
1007         }
1008     }
1009 
1010     private final class SocketWritableByteChannel implements WritableByteChannel {
1011 
1012         @Override
1013         public int write(ByteBuffer src) throws IOException {
1014             final int written;
1015             int position = src.position();
1016             int limit = src.limit();
1017             if (src.isDirect()) {
1018                 written = fd().write(src, position, src.limit());
1019             } else {
1020                 final int readableBytes = limit - position;
1021                 ByteBuf buffer = null;
1022                 try {
1023                     if (readableBytes == 0) {
1024                         buffer = Unpooled.EMPTY_BUFFER;
1025                     } else {
1026                         final ByteBufAllocator alloc = alloc();
1027                         if (alloc.isDirectBufferPooled()) {
1028                             buffer = alloc.directBuffer(readableBytes);
1029                         } else {
1030                             buffer = ByteBufUtil.threadLocalDirectBuffer();
1031                             if (buffer == null) {
1032                                 buffer = Unpooled.directBuffer(readableBytes);
1033                             }
1034                         }
1035                     }
1036                     buffer.writeBytes(src.duplicate());
1037                     ByteBuffer nioBuffer = buffer.internalNioBuffer(buffer.readerIndex(), readableBytes);
1038                     written = fd().write(nioBuffer, nioBuffer.position(), nioBuffer.limit());
1039                 } finally {
1040                     if (buffer != null) {
1041                         buffer.release();
1042                     }
1043                 }
1044             }
1045             if (written > 0) {
1046                 src.position(position + written);
1047             }
1048             return written;
1049         }
1050 
1051         @Override
1052         public boolean isOpen() {
1053             return fd().isOpen();
1054         }
1055 
1056         @Override
1057         public void close() throws IOException {
1058             fd().close();
1059         }
1060     }
1061 }