View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.ClosedChannelException;
47  import java.nio.channels.WritableByteChannel;
48  import java.util.Queue;
49  import java.util.concurrent.Executor;
50  
51  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53  import static io.netty.channel.unix.FileDescriptor.pipe;
54  import static io.netty.util.internal.ObjectUtil.checkNotNull;
55  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56  
57  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
58      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
59      private static final String EXPECTED_TYPES =
60              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
61                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
63  
64      private final Runnable flushTask = new Runnable() {
65          @Override
66          public void run() {
67              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
68              // meantime.
69              ((AbstractEpollUnsafe) unsafe()).flush0();
70          }
71      };
72  
73      // Lazy init these if we need to splice(...)
74      private volatile Queue<SpliceInTask> spliceQueue;
75      private FileDescriptor pipeIn;
76      private FileDescriptor pipeOut;
77  
78      private WritableByteChannel byteChannel;
79  
80      protected AbstractEpollStreamChannel(Channel parent, int fd) {
81          this(parent, new LinuxSocket(fd));
82      }
83  
84      protected AbstractEpollStreamChannel(int fd) {
85          this(new LinuxSocket(fd));
86      }
87  
88      AbstractEpollStreamChannel(LinuxSocket fd) {
89          this(fd, isSoErrorZero(fd));
90      }
91  
92      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
93          super(parent, fd, true);
94          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
95          flags |= Native.EPOLLRDHUP;
96      }
97  
98      protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
99          super(parent, fd, remote);
100         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
101         flags |= Native.EPOLLRDHUP;
102     }
103 
104     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
105         super(null, fd, active);
106         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
107         flags |= Native.EPOLLRDHUP;
108     }
109 
110     @Override
111     protected AbstractEpollUnsafe newUnsafe() {
112         return new EpollStreamUnsafe();
113     }
114 
115     @Override
116     public ChannelMetadata metadata() {
117         return METADATA;
118     }
119 
120     /**
121      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
122      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
123      * splice until the {@link ChannelFuture} was canceled or it was failed.
124      *
125      * Please note:
126      * <ul>
127      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
128      *   {@link IllegalArgumentException} is thrown. </li>
129      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
130      *   target {@link AbstractEpollStreamChannel}</li>
131      * </ul>
132      *
133      */
134     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
135         return spliceTo(ch, len, newPromise());
136     }
137 
138     /**
139      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
140      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
141      * splice until the {@link ChannelFuture} was canceled or it was failed.
142      *
143      * Please note:
144      * <ul>
145      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
146      *   {@link IllegalArgumentException} is thrown. </li>
147      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
148      *   target {@link AbstractEpollStreamChannel}</li>
149      * </ul>
150      *
151      */
152     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
153                                         final ChannelPromise promise) {
154         if (ch.eventLoop() != eventLoop()) {
155             throw new IllegalArgumentException("EventLoops are not the same.");
156         }
157         checkPositiveOrZero(len, "len");
158         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
159                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
160             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
161         }
162         checkNotNull(promise, "promise");
163         if (!isOpen()) {
164             promise.tryFailure(new ClosedChannelException());
165         } else {
166             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
167             failSpliceIfClosed(promise);
168         }
169         return promise;
170     }
171 
172     /**
173      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
174      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
175      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
176      * {@link ChannelFuture} was canceled or it was failed.
177      *
178      * Please note:
179      * <ul>
180      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
181      *   {@link AbstractEpollStreamChannel}</li>
182      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
183      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
184      * </ul>
185      */
186     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
187         return spliceTo(ch, offset, len, newPromise());
188     }
189 
190     /**
191      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
192      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
193      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
194      * {@link ChannelFuture} was canceled or it was failed.
195      *
196      * Please note:
197      * <ul>
198      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
199      *   {@link AbstractEpollStreamChannel}</li>
200      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
201      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
202      * </ul>
203      */
204     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
205                                         final ChannelPromise promise) {
206         checkPositiveOrZero(len, "len");
207         checkPositiveOrZero(offset, "offset");
208         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
209             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
210         }
211         checkNotNull(promise, "promise");
212         if (!isOpen()) {
213             promise.tryFailure(new ClosedChannelException());
214         } else {
215             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
216             failSpliceIfClosed(promise);
217         }
218         return promise;
219     }
220 
221     private void failSpliceIfClosed(ChannelPromise promise) {
222         if (!isOpen()) {
223             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
224             // cases where a future may not be notified otherwise.
225             if (!promise.isDone()) {
226                 final ClosedChannelException ex = new ClosedChannelException();
227                 if (promise.tryFailure(ex)) {
228                     eventLoop().execute(new Runnable() {
229                         @Override
230                         public void run() {
231                             // Call this via the EventLoop as it is a MPSC queue.
232                             clearSpliceQueue(ex);
233                         }
234                     });
235                 }
236             }
237         }
238     }
239 
240     /**
241      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
242      * @param in the collection which contains objects to write.
243      * @param buf the {@link ByteBuf} from which the bytes should be written
244      * @return The value that should be decremented from the write quantum which starts at
245      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
246      * <ul>
247      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
248      *     is encountered</li>
249      *     <li>1 - if a single call to write data was made to the OS</li>
250      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
251      *     no data was accepted</li>
252      * </ul>
253      */
254     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
255         int readableBytes = buf.readableBytes();
256         if (readableBytes == 0) {
257             in.remove();
258             return 0;
259         }
260 
261         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
262             return doWriteBytes(in, buf);
263         } else {
264             ByteBuffer[] nioBuffers = buf.nioBuffers();
265             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
266                     config().getMaxBytesPerGatheringWrite());
267         }
268     }
269 
270     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
271         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
272         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
273         // make a best effort to adjust as OS behavior changes.
274         if (attempted == written) {
275             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
276                 config().setMaxBytesPerGatheringWrite(attempted << 1);
277             }
278         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
279             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
280         }
281     }
282 
283     /**
284      * Write multiple bytes via {@link IovArray}.
285      * @param in the collection which contains objects to write.
286      * @param array The array which contains the content to write.
287      * @return The value that should be decremented from the write quantum which starts at
288      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
289      * <ul>
290      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
291      *     is encountered</li>
292      *     <li>1 - if a single call to write data was made to the OS</li>
293      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
294      *     no data was accepted</li>
295      * </ul>
296      * @throws IOException If an I/O exception occurs during write.
297      */
298     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
299         final long expectedWrittenBytes = array.size();
300         assert expectedWrittenBytes != 0;
301         final int cnt = array.count();
302         assert cnt != 0;
303 
304         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
305         if (localWrittenBytes > 0) {
306             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
307             in.removeBytes(localWrittenBytes);
308             return 1;
309         }
310         return WRITE_STATUS_SNDBUF_FULL;
311     }
312 
313     /**
314      * Write multiple bytes via {@link ByteBuffer} array.
315      * @param in the collection which contains objects to write.
316      * @param nioBuffers The buffers to write.
317      * @param nioBufferCnt The number of buffers to write.
318      * @param expectedWrittenBytes The number of bytes we expect to write.
319      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
320      * @return The value that should be decremented from the write quantum which starts at
321      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
322      * <ul>
323      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
324      *     is encountered</li>
325      *     <li>1 - if a single call to write data was made to the OS</li>
326      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
327      *     no data was accepted</li>
328      * </ul>
329      * @throws IOException If an I/O exception occurs during write.
330      */
331     private int writeBytesMultiple(
332             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
333             long maxBytesPerGatheringWrite) throws IOException {
334         assert expectedWrittenBytes != 0;
335         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
336             expectedWrittenBytes = maxBytesPerGatheringWrite;
337         }
338 
339         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
340         if (localWrittenBytes > 0) {
341             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
342             in.removeBytes(localWrittenBytes);
343             return 1;
344         }
345         return WRITE_STATUS_SNDBUF_FULL;
346     }
347 
348     /**
349      * Write a {@link DefaultFileRegion}
350      * @param in the collection which contains objects to write.
351      * @param region the {@link DefaultFileRegion} from which the bytes should be written
352      * @return The value that should be decremented from the write quantum which starts at
353      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
354      * <ul>
355      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
356      *     is encountered</li>
357      *     <li>1 - if a single call to write data was made to the OS</li>
358      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
359      *     no data was accepted</li>
360      * </ul>
361      */
362     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
363         final long offset = region.transferred();
364         final long regionCount = region.count();
365         if (offset >= regionCount) {
366             in.remove();
367             return 0;
368         }
369 
370         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
371         if (flushedAmount > 0) {
372             in.progress(flushedAmount);
373             if (region.transferred() >= regionCount) {
374                 in.remove();
375             }
376             return 1;
377         } else if (flushedAmount == 0) {
378             validateFileRegion(region, offset);
379         }
380         return WRITE_STATUS_SNDBUF_FULL;
381     }
382 
383     /**
384      * Write a {@link FileRegion}
385      * @param in the collection which contains objects to write.
386      * @param region the {@link FileRegion} from which the bytes should be written
387      * @return The value that should be decremented from the write quantum which starts at
388      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
389      * <ul>
390      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
391      *     is encountered</li>
392      *     <li>1 - if a single call to write data was made to the OS</li>
393      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
394      *     no data was accepted</li>
395      * </ul>
396      */
397     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
398         if (region.transferred() >= region.count()) {
399             in.remove();
400             return 0;
401         }
402 
403         if (byteChannel == null) {
404             byteChannel = new EpollSocketWritableByteChannel();
405         }
406         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
407         if (flushedAmount > 0) {
408             in.progress(flushedAmount);
409             if (region.transferred() >= region.count()) {
410                 in.remove();
411             }
412             return 1;
413         }
414         return WRITE_STATUS_SNDBUF_FULL;
415     }
416 
417     @Override
418     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
419         int writeSpinCount = config().getWriteSpinCount();
420         do {
421             final int msgCount = in.size();
422             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
423             if (msgCount > 1 && in.current() instanceof ByteBuf) {
424                 writeSpinCount -= doWriteMultiple(in);
425             } else if (msgCount == 0) {
426                 // Wrote all messages.
427                 clearFlag(Native.EPOLLOUT);
428                 // Return here so we not set the EPOLLOUT flag.
429                 return;
430             } else {  // msgCount == 1
431                 writeSpinCount -= doWriteSingle(in);
432             }
433 
434             // We do not break the loop here even if the outbound buffer was flushed completely,
435             // because a user might have triggered another write and flush when we notify his or her
436             // listeners.
437         } while (writeSpinCount > 0);
438 
439         if (writeSpinCount == 0) {
440             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
441             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
442             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
443             // and set the EPOLLOUT if necessary.
444             clearFlag(Native.EPOLLOUT);
445 
446             // We used our writeSpin quantum, and should try to write again later.
447             eventLoop().execute(flushTask);
448         } else {
449             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
450             // when it can accept more data.
451             setFlag(Native.EPOLLOUT);
452         }
453     }
454 
455     /**
456      * Attempt to write a single object.
457      * @param in the collection which contains objects to write.
458      * @return The value that should be decremented from the write quantum which starts at
459      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
460      * <ul>
461      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
462      *     is encountered</li>
463      *     <li>1 - if a single call to write data was made to the OS</li>
464      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
465      *     no data was accepted</li>
466      * </ul>
467      * @throws Exception If an I/O error occurs.
468      */
469     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
470         // The outbound buffer contains only one message or it contains a file region.
471         Object msg = in.current();
472         if (msg instanceof ByteBuf) {
473             return writeBytes(in, (ByteBuf) msg);
474         } else if (msg instanceof DefaultFileRegion) {
475             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
476         } else if (msg instanceof FileRegion) {
477             return writeFileRegion(in, (FileRegion) msg);
478         } else if (msg instanceof SpliceOutTask) {
479             if (!((SpliceOutTask) msg).spliceOut()) {
480                 return WRITE_STATUS_SNDBUF_FULL;
481             }
482             in.remove();
483             return 1;
484         } else {
485             // Should never reach here.
486             throw new Error();
487         }
488     }
489 
490     /**
491      * Attempt to write multiple {@link ByteBuf} objects.
492      * @param in the collection which contains objects to write.
493      * @return The value that should be decremented from the write quantum which starts at
494      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
495      * <ul>
496      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
497      *     is encountered</li>
498      *     <li>1 - if a single call to write data was made to the OS</li>
499      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
500      *     no data was accepted</li>
501      * </ul>
502      * @throws Exception If an I/O error occurs.
503      */
504     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
505         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
506         IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
507         array.maxBytes(maxBytesPerGatheringWrite);
508         in.forEachFlushedMessage(array);
509 
510         if (array.count() >= 1) {
511             // TODO: Handle the case where cnt == 1 specially.
512             return writeBytesMultiple(in, array);
513         }
514         // cnt == 0, which means the outbound buffer contained empty buffers only.
515         in.removeBytes(0);
516         return 0;
517     }
518 
519     @Override
520     protected Object filterOutboundMessage(Object msg) {
521         if (msg instanceof ByteBuf) {
522             ByteBuf buf = (ByteBuf) msg;
523             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
524         }
525 
526         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
527             return msg;
528         }
529 
530         throw new UnsupportedOperationException(
531                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
532     }
533 
534     @Override
535     protected final void doShutdownOutput() throws Exception {
536         socket.shutdown(false, true);
537     }
538 
539     private void shutdownInput0(final ChannelPromise promise) {
540         try {
541             socket.shutdown(true, false);
542             promise.setSuccess();
543         } catch (Throwable cause) {
544             promise.setFailure(cause);
545         }
546     }
547 
548     @Override
549     public boolean isOutputShutdown() {
550         return socket.isOutputShutdown();
551     }
552 
553     @Override
554     public boolean isInputShutdown() {
555         return socket.isInputShutdown();
556     }
557 
558     @Override
559     public boolean isShutdown() {
560         return socket.isShutdown();
561     }
562 
563     @Override
564     public ChannelFuture shutdownOutput() {
565         return shutdownOutput(newPromise());
566     }
567 
568     @Override
569     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
570         EventLoop loop = eventLoop();
571         if (loop.inEventLoop()) {
572             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
573         } else {
574             loop.execute(new Runnable() {
575                 @Override
576                 public void run() {
577                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
578                 }
579             });
580         }
581 
582         return promise;
583     }
584 
585     @Override
586     public ChannelFuture shutdownInput() {
587         return shutdownInput(newPromise());
588     }
589 
590     @Override
591     public ChannelFuture shutdownInput(final ChannelPromise promise) {
592         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
593         if (closeExecutor != null) {
594             closeExecutor.execute(new Runnable() {
595                 @Override
596                 public void run() {
597                     shutdownInput0(promise);
598                 }
599             });
600         } else {
601             EventLoop loop = eventLoop();
602             if (loop.inEventLoop()) {
603                 shutdownInput0(promise);
604             } else {
605                 loop.execute(new Runnable() {
606                     @Override
607                     public void run() {
608                         shutdownInput0(promise);
609                     }
610                 });
611             }
612         }
613         return promise;
614     }
615 
616     @Override
617     public ChannelFuture shutdown() {
618         return shutdown(newPromise());
619     }
620 
621     @Override
622     public ChannelFuture shutdown(final ChannelPromise promise) {
623         ChannelFuture shutdownOutputFuture = shutdownOutput();
624         if (shutdownOutputFuture.isDone()) {
625             shutdownOutputDone(shutdownOutputFuture, promise);
626         } else {
627             shutdownOutputFuture.addListener(new ChannelFutureListener() {
628                 @Override
629                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
630                     shutdownOutputDone(shutdownOutputFuture, promise);
631                 }
632             });
633         }
634         return promise;
635     }
636 
637     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
638         ChannelFuture shutdownInputFuture = shutdownInput();
639         if (shutdownInputFuture.isDone()) {
640             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
641         } else {
642             shutdownInputFuture.addListener(new ChannelFutureListener() {
643                 @Override
644                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
645                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
646                 }
647             });
648         }
649     }
650 
651     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
652                               ChannelFuture shutdownInputFuture,
653                               ChannelPromise promise) {
654         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
655         Throwable shutdownInputCause = shutdownInputFuture.cause();
656         if (shutdownOutputCause != null) {
657             if (shutdownInputCause != null) {
658                 logger.debug("Exception suppressed because a previous exception occurred.",
659                         shutdownInputCause);
660             }
661             promise.setFailure(shutdownOutputCause);
662         } else if (shutdownInputCause != null) {
663             promise.setFailure(shutdownInputCause);
664         } else {
665             promise.setSuccess();
666         }
667     }
668 
669     @Override
670     protected void doClose() throws Exception {
671         try {
672             // Calling super.doClose() first so spliceTo(...) will fail on next call.
673             super.doClose();
674         } finally {
675             safeClosePipe(pipeIn);
676             safeClosePipe(pipeOut);
677             clearSpliceQueue(null);
678         }
679     }
680 
681     private void clearSpliceQueue(ClosedChannelException exception) {
682         Queue<SpliceInTask> sQueue = spliceQueue;
683         if (sQueue == null) {
684             return;
685         }
686         for (;;) {
687             SpliceInTask task = sQueue.poll();
688             if (task == null) {
689                 break;
690             }
691             if (exception == null) {
692                 exception = new ClosedChannelException();
693             }
694             task.promise.tryFailure(exception);
695         }
696     }
697 
698     private static void safeClosePipe(FileDescriptor fd) {
699         if (fd != null) {
700             try {
701                 fd.close();
702             } catch (IOException e) {
703                 logger.warn("Error while closing a pipe", e);
704             }
705         }
706     }
707 
708     class EpollStreamUnsafe extends AbstractEpollUnsafe {
709         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
710         @Override
711         protected Executor prepareToClose() {
712             return super.prepareToClose();
713         }
714 
715         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
716                 EpollRecvByteAllocatorHandle allocHandle) {
717             if (byteBuf != null) {
718                 if (byteBuf.isReadable()) {
719                     readPending = false;
720                     pipeline.fireChannelRead(byteBuf);
721                 } else {
722                     byteBuf.release();
723                 }
724             }
725             allocHandle.readComplete();
726             pipeline.fireChannelReadComplete();
727             pipeline.fireExceptionCaught(cause);
728 
729             // If oom will close the read event, release connection.
730             // See https://github.com/netty/netty/issues/10434
731             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
732                 shutdownInput(false);
733             }
734         }
735 
736         @Override
737         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
738             return new EpollRecvByteAllocatorStreamingHandle(handle);
739         }
740 
741         @Override
742         void epollInReady() {
743             final ChannelConfig config = config();
744             if (shouldBreakEpollInReady(config)) {
745                 clearEpollIn0();
746                 return;
747             }
748             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
749             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
750 
751             final ChannelPipeline pipeline = pipeline();
752             final ByteBufAllocator allocator = config.getAllocator();
753             allocHandle.reset(config);
754             epollInBefore();
755 
756             ByteBuf byteBuf = null;
757             boolean close = false;
758             Queue<SpliceInTask> sQueue = null;
759             try {
760                 do {
761                     if (sQueue != null || (sQueue = spliceQueue) != null) {
762                         SpliceInTask spliceTask = sQueue.peek();
763                         if (spliceTask != null) {
764                             boolean spliceInResult = spliceTask.spliceIn(allocHandle);
765 
766                             if (allocHandle.isReceivedRdHup()) {
767                                 shutdownInput(true);
768                             }
769                             if (spliceInResult) {
770                                 // We need to check if it is still active as if not we removed all SpliceTasks in
771                                 // doClose(...)
772                                 if (isActive()) {
773                                     sQueue.remove();
774                                 }
775                                 continue;
776                             } else {
777                                 break;
778                             }
779                         }
780                     }
781 
782                     // we use a direct buffer here as the native implementations only be able
783                     // to handle direct buffers.
784                     byteBuf = allocHandle.allocate(allocator);
785                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
786                     if (allocHandle.lastBytesRead() <= 0) {
787                         // nothing was read, release the buffer.
788                         byteBuf.release();
789                         byteBuf = null;
790                         close = allocHandle.lastBytesRead() < 0;
791                         if (close) {
792                             // There is nothing left to read as we received an EOF.
793                             readPending = false;
794                         }
795                         break;
796                     }
797                     allocHandle.incMessagesRead(1);
798                     readPending = false;
799                     pipeline.fireChannelRead(byteBuf);
800                     byteBuf = null;
801 
802                     if (shouldBreakEpollInReady(config)) {
803                         // We need to do this for two reasons:
804                         //
805                         // - If the input was shutdown in between (which may be the case when the user did it in the
806                         //   fireChannelRead(...) method we should not try to read again to not produce any
807                         //   miss-leading exceptions.
808                         //
809                         // - If the user closes the channel we need to ensure we not try to read from it again as
810                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
811                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
812                         //   reading data from a filedescriptor that belongs to another socket then the socket that
813                         //   was "wrapped" by this Channel implementation.
814                         break;
815                     }
816                 } while (allocHandle.continueReading());
817 
818                 allocHandle.readComplete();
819                 pipeline.fireChannelReadComplete();
820 
821                 if (close) {
822                     shutdownInput(false);
823                 }
824             } catch (Throwable t) {
825                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
826             } finally {
827                 if (sQueue == null) {
828                     epollInFinally(config);
829                 } else {
830                     if (!config.isAutoRead()) {
831                         clearEpollIn();
832                     }
833                 }
834             }
835         }
836     }
837 
838     private void addToSpliceQueue(final SpliceInTask task) {
839         Queue<SpliceInTask> sQueue = spliceQueue;
840         if (sQueue == null) {
841             synchronized (this) {
842                 sQueue = spliceQueue;
843                 if (sQueue == null) {
844                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
845                 }
846             }
847         }
848         sQueue.add(task);
849     }
850 
851     protected abstract class SpliceInTask {
852         final ChannelPromise promise;
853         int len;
854 
855         protected SpliceInTask(int len, ChannelPromise promise) {
856             this.promise = promise;
857             this.len = len;
858         }
859 
860         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
861 
862         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
863             // calculate the maximum amount of data we are allowed to splice
864             int length = Math.min(handle.guess(), len);
865             int splicedIn = 0;
866             for (;;) {
867                 // Splicing until there is nothing left to splice.
868                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
869                 handle.lastBytesRead(localSplicedIn);
870                 if (localSplicedIn == 0) {
871                     break;
872                 }
873                 splicedIn += localSplicedIn;
874                 length -= localSplicedIn;
875             }
876 
877             return splicedIn;
878         }
879     }
880 
881     // Let it directly implement channelFutureListener as well to reduce object creation.
882     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
883         private final AbstractEpollStreamChannel ch;
884 
885         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
886             super(len, promise);
887             this.ch = ch;
888         }
889 
890         @Override
891         public void operationComplete(ChannelFuture future) throws Exception {
892             if (!future.isSuccess()) {
893                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
894                 promise.tryFailure(future.cause());
895             }
896         }
897 
898         @Override
899         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
900             assert ch.eventLoop().inEventLoop();
901             if (len == 0) {
902                 // Use trySuccess() as the promise might already be closed by spliceTo(...)
903                 promise.trySuccess();
904                 return true;
905             }
906             try {
907                 // We create the pipe on the target channel as this will allow us to just handle pending writes
908                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
909                 // on multiple Channels pointing to one target Channel.
910                 FileDescriptor pipeOut = ch.pipeOut;
911                 if (pipeOut == null) {
912                     // Create a new pipe as non was created before.
913                     FileDescriptor[] pipe = pipe();
914                     ch.pipeIn = pipe[0];
915                     pipeOut = ch.pipeOut = pipe[1];
916                 }
917 
918                 int splicedIn = spliceIn(pipeOut, handle);
919                 if (splicedIn > 0) {
920                     // Integer.MAX_VALUE is a special value which will result in splice forever.
921                     if (len != Integer.MAX_VALUE) {
922                         len -= splicedIn;
923                     }
924 
925                     // Depending on if we are done with splicing inbound data we set the right promise for the
926                     // outbound splicing.
927                     final ChannelPromise splicePromise;
928                     if (len == 0) {
929                         splicePromise = promise;
930                     } else {
931                         splicePromise = ch.newPromise().addListener(this);
932                     }
933 
934                     boolean autoRead = config().isAutoRead();
935 
936                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
937                     // case.
938                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
939                     ch.unsafe().flush();
940                     if (autoRead && !splicePromise.isDone()) {
941                         // Write was not done which means the target channel was not writable. In this case we need to
942                         // disable reading until we are done with splicing to the target channel because:
943                         //
944                         // - The user may want to trigger another splice operation once the splicing was complete.
945                         config().setAutoRead(false);
946                     }
947                 }
948 
949                 return len == 0;
950             } catch (Throwable cause) {
951                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
952                 promise.tryFailure(cause);
953                 return true;
954             }
955         }
956     }
957 
958     private final class SpliceOutTask {
959         private final AbstractEpollStreamChannel ch;
960         private final boolean autoRead;
961         private int len;
962 
963         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
964             this.ch = ch;
965             this.len = len;
966             this.autoRead = autoRead;
967         }
968 
969         public boolean spliceOut() throws Exception {
970             assert ch.eventLoop().inEventLoop();
971             try {
972                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
973                 len -= splicedOut;
974                 if (len == 0) {
975                     if (autoRead) {
976                         // AutoRead was used and we spliced everything so start reading again
977                         config().setAutoRead(true);
978                     }
979                     return true;
980                 }
981                 return false;
982             } catch (IOException e) {
983                 if (autoRead) {
984                     // AutoRead was used and we spliced everything so start reading again
985                     config().setAutoRead(true);
986                 }
987                 throw e;
988             }
989         }
990     }
991 
992     private final class SpliceFdTask extends SpliceInTask {
993         private final FileDescriptor fd;
994         private final ChannelPromise promise;
995         private int offset;
996 
997         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
998             super(len, promise);
999             this.fd = fd;
1000             this.promise = promise;
1001             this.offset = offset;
1002         }
1003 
1004         @Override
1005         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1006             assert eventLoop().inEventLoop();
1007             if (len == 0) {
1008                 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1009                 promise.trySuccess();
1010                 return true;
1011             }
1012 
1013             try {
1014                 FileDescriptor[] pipe = pipe();
1015                 FileDescriptor pipeIn = pipe[0];
1016                 FileDescriptor pipeOut = pipe[1];
1017                 try {
1018                     int splicedIn = spliceIn(pipeOut, handle);
1019                     if (splicedIn > 0) {
1020                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1021                         if (len != Integer.MAX_VALUE) {
1022                             len -= splicedIn;
1023                         }
1024                         do {
1025                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1026                             offset += splicedOut;
1027                             splicedIn -= splicedOut;
1028                         } while (splicedIn > 0);
1029                         if (len == 0) {
1030                             // Use trySuccess() as the promise might already be failed by spliceTo(...)
1031                             promise.trySuccess();
1032                             return true;
1033                         }
1034                     }
1035                     return false;
1036                 } finally {
1037                     safeClosePipe(pipeIn);
1038                     safeClosePipe(pipeOut);
1039                 }
1040             } catch (Throwable cause) {
1041                 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1042                 promise.tryFailure(cause);
1043                 return true;
1044             }
1045         }
1046     }
1047 
1048     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1049         EpollSocketWritableByteChannel() {
1050             super(socket);
1051             assert fd == socket;
1052         }
1053 
1054         @Override
1055         protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1056             return socket.send(buf, pos, limit);
1057         }
1058 
1059         @Override
1060         protected ByteBufAllocator alloc() {
1061             return AbstractEpollStreamChannel.this.alloc();
1062         }
1063     }
1064 }