View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.ClosedChannelException;
47  import java.nio.channels.WritableByteChannel;
48  import java.util.Queue;
49  import java.util.concurrent.Executor;
50  
51  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53  import static io.netty.channel.unix.FileDescriptor.pipe;
54  import static io.netty.util.internal.ObjectUtil.checkNotNull;
55  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56  
57  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
58      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
59      private static final String EXPECTED_TYPES =
60              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
61                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
63  
64      private final Runnable flushTask = new Runnable() {
65          @Override
66          public void run() {
67              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
68              // meantime.
69              ((AbstractEpollUnsafe) unsafe()).flush0();
70          }
71      };
72  
73      // Lazy init these if we need to splice(...)
74      private volatile Queue<SpliceInTask> spliceQueue;
75      private FileDescriptor pipeIn;
76      private FileDescriptor pipeOut;
77  
78      private WritableByteChannel byteChannel;
79  
80      protected AbstractEpollStreamChannel(Channel parent, int fd) {
81          this(parent, new LinuxSocket(fd));
82      }
83  
84      protected AbstractEpollStreamChannel(int fd) {
85          this(new LinuxSocket(fd));
86      }
87  
88      AbstractEpollStreamChannel(LinuxSocket fd) {
89          this(fd, isSoErrorZero(fd));
90      }
91  
92      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
93          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
94          super(parent, fd, true, EpollIoOps.EPOLLRDHUP);
95      }
96  
97      protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
98          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
99          super(parent, fd, remote, EpollIoOps.EPOLLRDHUP);
100     }
101 
102     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
103         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
104         super(null, fd, active, EpollIoOps.EPOLLRDHUP);
105     }
106 
107     @Override
108     protected AbstractEpollUnsafe newUnsafe() {
109         return new EpollStreamUnsafe();
110     }
111 
112     @Override
113     public ChannelMetadata metadata() {
114         return METADATA;
115     }
116 
117     /**
118      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
119      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
120      * splice until the {@link ChannelFuture} was canceled or it was failed.
121      *
122      * Please note:
123      * <ul>
124      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
125      *   {@link IllegalArgumentException} is thrown. </li>
126      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
127      *   target {@link AbstractEpollStreamChannel}</li>
128      * </ul>
129      *
130      */
131     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
132         return spliceTo(ch, len, newPromise());
133     }
134 
135     /**
136      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
137      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
138      * splice until the {@link ChannelFuture} was canceled or it was failed.
139      *
140      * Please note:
141      * <ul>
142      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
143      *   {@link IllegalArgumentException} is thrown. </li>
144      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
145      *   target {@link AbstractEpollStreamChannel}</li>
146      * </ul>
147      *
148      */
149     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
150                                         final ChannelPromise promise) {
151         if (ch.eventLoop() != eventLoop()) {
152             throw new IllegalArgumentException("EventLoops are not the same.");
153         }
154         checkPositiveOrZero(len, "len");
155         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
156                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
157             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
158         }
159         checkNotNull(promise, "promise");
160         if (!isOpen()) {
161             promise.tryFailure(new ClosedChannelException());
162         } else {
163             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
164             failSpliceIfClosed(promise);
165         }
166         return promise;
167     }
168 
169     /**
170      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
171      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
172      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
173      * {@link ChannelFuture} was canceled or it was failed.
174      *
175      * Please note:
176      * <ul>
177      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
178      *   {@link AbstractEpollStreamChannel}</li>
179      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
180      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
181      * </ul>
182      */
183     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
184         return spliceTo(ch, offset, len, newPromise());
185     }
186 
187     /**
188      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
189      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
190      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
191      * {@link ChannelFuture} was canceled or it was failed.
192      *
193      * Please note:
194      * <ul>
195      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
196      *   {@link AbstractEpollStreamChannel}</li>
197      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
198      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
199      * </ul>
200      */
201     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
202                                         final ChannelPromise promise) {
203         checkPositiveOrZero(len, "len");
204         checkPositiveOrZero(offset, "offset");
205         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
206             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
207         }
208         checkNotNull(promise, "promise");
209         if (!isOpen()) {
210             promise.tryFailure(new ClosedChannelException());
211         } else {
212             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
213             failSpliceIfClosed(promise);
214         }
215         return promise;
216     }
217 
218     private void failSpliceIfClosed(ChannelPromise promise) {
219         if (!isOpen()) {
220             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
221             // cases where a future may not be notified otherwise.
222             if (!promise.isDone()) {
223                 final ClosedChannelException ex = new ClosedChannelException();
224                 if (promise.tryFailure(ex)) {
225                     eventLoop().execute(new Runnable() {
226                         @Override
227                         public void run() {
228                             // Call this via the EventLoop as it is a MPSC queue.
229                             clearSpliceQueue(ex);
230                         }
231                     });
232                 }
233             }
234         }
235     }
236 
237     /**
238      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
239      * @param in the collection which contains objects to write.
240      * @param buf the {@link ByteBuf} from which the bytes should be written
241      * @return The value that should be decremented from the write quantum which starts at
242      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
243      * <ul>
244      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
245      *     is encountered</li>
246      *     <li>1 - if a single call to write data was made to the OS</li>
247      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
248      *     no data was accepted</li>
249      * </ul>
250      */
251     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
252         int readableBytes = buf.readableBytes();
253         if (readableBytes == 0) {
254             in.remove();
255             return 0;
256         }
257 
258         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
259             return doWriteBytes(in, buf);
260         } else {
261             ByteBuffer[] nioBuffers = buf.nioBuffers();
262             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
263                     config().getMaxBytesPerGatheringWrite());
264         }
265     }
266 
267     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
268         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
269         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
270         // make a best effort to adjust as OS behavior changes.
271         if (attempted == written) {
272             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
273                 config().setMaxBytesPerGatheringWrite(attempted << 1);
274             }
275         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
276             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
277         }
278     }
279 
280     /**
281      * Write multiple bytes via {@link IovArray}.
282      * @param in the collection which contains objects to write.
283      * @param array The array which contains the content to write.
284      * @return The value that should be decremented from the write quantum which starts at
285      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
286      * <ul>
287      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
288      *     is encountered</li>
289      *     <li>1 - if a single call to write data was made to the OS</li>
290      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
291      *     no data was accepted</li>
292      * </ul>
293      * @throws IOException If an I/O exception occurs during write.
294      */
295     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
296         final long expectedWrittenBytes = array.size();
297         assert expectedWrittenBytes != 0;
298         final int cnt = array.count();
299         assert cnt != 0;
300 
301         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
302         if (localWrittenBytes > 0) {
303             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
304             in.removeBytes(localWrittenBytes);
305             return 1;
306         }
307         return WRITE_STATUS_SNDBUF_FULL;
308     }
309 
310     /**
311      * Write multiple bytes via {@link ByteBuffer} array.
312      * @param in the collection which contains objects to write.
313      * @param nioBuffers The buffers to write.
314      * @param nioBufferCnt The number of buffers to write.
315      * @param expectedWrittenBytes The number of bytes we expect to write.
316      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
317      * @return The value that should be decremented from the write quantum which starts at
318      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
319      * <ul>
320      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
321      *     is encountered</li>
322      *     <li>1 - if a single call to write data was made to the OS</li>
323      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
324      *     no data was accepted</li>
325      * </ul>
326      * @throws IOException If an I/O exception occurs during write.
327      */
328     private int writeBytesMultiple(
329             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
330             long maxBytesPerGatheringWrite) throws IOException {
331         assert expectedWrittenBytes != 0;
332         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
333             expectedWrittenBytes = maxBytesPerGatheringWrite;
334         }
335 
336         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
337         if (localWrittenBytes > 0) {
338             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
339             in.removeBytes(localWrittenBytes);
340             return 1;
341         }
342         return WRITE_STATUS_SNDBUF_FULL;
343     }
344 
345     /**
346      * Write a {@link DefaultFileRegion}
347      * @param in the collection which contains objects to write.
348      * @param region the {@link DefaultFileRegion} from which the bytes should be written
349      * @return The value that should be decremented from the write quantum which starts at
350      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
351      * <ul>
352      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
353      *     is encountered</li>
354      *     <li>1 - if a single call to write data was made to the OS</li>
355      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
356      *     no data was accepted</li>
357      * </ul>
358      */
359     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
360         final long offset = region.transferred();
361         final long regionCount = region.count();
362         if (offset >= regionCount) {
363             in.remove();
364             return 0;
365         }
366 
367         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
368         if (flushedAmount > 0) {
369             in.progress(flushedAmount);
370             if (region.transferred() >= regionCount) {
371                 in.remove();
372             }
373             return 1;
374         } else if (flushedAmount == 0) {
375             validateFileRegion(region, offset);
376         }
377         return WRITE_STATUS_SNDBUF_FULL;
378     }
379 
380     /**
381      * Write a {@link FileRegion}
382      * @param in the collection which contains objects to write.
383      * @param region the {@link FileRegion} from which the bytes should be written
384      * @return The value that should be decremented from the write quantum which starts at
385      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
386      * <ul>
387      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
388      *     is encountered</li>
389      *     <li>1 - if a single call to write data was made to the OS</li>
390      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
391      *     no data was accepted</li>
392      * </ul>
393      */
394     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
395         if (region.transferred() >= region.count()) {
396             in.remove();
397             return 0;
398         }
399 
400         if (byteChannel == null) {
401             byteChannel = new EpollSocketWritableByteChannel();
402         }
403         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
404         if (flushedAmount > 0) {
405             in.progress(flushedAmount);
406             if (region.transferred() >= region.count()) {
407                 in.remove();
408             }
409             return 1;
410         }
411         return WRITE_STATUS_SNDBUF_FULL;
412     }
413 
414     @Override
415     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
416         int writeSpinCount = config().getWriteSpinCount();
417         do {
418             final int msgCount = in.size();
419             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
420             if (msgCount > 1 && in.current() instanceof ByteBuf) {
421                 writeSpinCount -= doWriteMultiple(in);
422             } else if (msgCount == 0) {
423                 // Wrote all messages.
424                 clearFlag(Native.EPOLLOUT);
425                 // Return here so we not set the EPOLLOUT flag.
426                 return;
427             } else {  // msgCount == 1
428                 writeSpinCount -= doWriteSingle(in);
429             }
430 
431             // We do not break the loop here even if the outbound buffer was flushed completely,
432             // because a user might have triggered another write and flush when we notify his or her
433             // listeners.
434         } while (writeSpinCount > 0);
435 
436         if (writeSpinCount == 0) {
437             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
438             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
439             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
440             // and set the EPOLLOUT if necessary.
441             clearFlag(Native.EPOLLOUT);
442 
443             // We used our writeSpin quantum, and should try to write again later.
444             eventLoop().execute(flushTask);
445         } else {
446             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
447             // when it can accept more data.
448             setFlag(Native.EPOLLOUT);
449         }
450     }
451 
452     /**
453      * Attempt to write a single object.
454      * @param in the collection which contains objects to write.
455      * @return The value that should be decremented from the write quantum which starts at
456      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
457      * <ul>
458      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
459      *     is encountered</li>
460      *     <li>1 - if a single call to write data was made to the OS</li>
461      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
462      *     no data was accepted</li>
463      * </ul>
464      * @throws Exception If an I/O error occurs.
465      */
466     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
467         // The outbound buffer contains only one message or it contains a file region.
468         Object msg = in.current();
469         if (msg instanceof ByteBuf) {
470             return writeBytes(in, (ByteBuf) msg);
471         } else if (msg instanceof DefaultFileRegion) {
472             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
473         } else if (msg instanceof FileRegion) {
474             return writeFileRegion(in, (FileRegion) msg);
475         } else if (msg instanceof SpliceOutTask) {
476             if (!((SpliceOutTask) msg).spliceOut()) {
477                 return WRITE_STATUS_SNDBUF_FULL;
478             }
479             in.remove();
480             return 1;
481         } else {
482             // Should never reach here.
483             throw new Error();
484         }
485     }
486 
487     /**
488      * Attempt to write multiple {@link ByteBuf} objects.
489      * @param in the collection which contains objects to write.
490      * @return The value that should be decremented from the write quantum which starts at
491      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
492      * <ul>
493      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
494      *     is encountered</li>
495      *     <li>1 - if a single call to write data was made to the OS</li>
496      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
497      *     no data was accepted</li>
498      * </ul>
499      * @throws Exception If an I/O error occurs.
500      */
501     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
502         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
503         IovArray array = registration().ioHandler().cleanIovArray();
504         array.maxBytes(maxBytesPerGatheringWrite);
505         in.forEachFlushedMessage(array);
506 
507         if (array.count() >= 1) {
508             // TODO: Handle the case where cnt == 1 specially.
509             return writeBytesMultiple(in, array);
510         }
511         // cnt == 0, which means the outbound buffer contained empty buffers only.
512         in.removeBytes(0);
513         return 0;
514     }
515 
516     @Override
517     protected Object filterOutboundMessage(Object msg) {
518         if (msg instanceof ByteBuf) {
519             ByteBuf buf = (ByteBuf) msg;
520             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
521         }
522 
523         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
524             return msg;
525         }
526 
527         throw new UnsupportedOperationException(
528                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
529     }
530 
531     @Override
532     protected final void doShutdownOutput() throws Exception {
533         socket.shutdown(false, true);
534     }
535 
536     private void shutdownInput0(final ChannelPromise promise) {
537         try {
538             socket.shutdown(true, false);
539             promise.setSuccess();
540         } catch (Throwable cause) {
541             promise.setFailure(cause);
542         }
543     }
544 
545     @Override
546     public boolean isOutputShutdown() {
547         return socket.isOutputShutdown();
548     }
549 
550     @Override
551     public boolean isInputShutdown() {
552         return socket.isInputShutdown();
553     }
554 
555     @Override
556     public boolean isShutdown() {
557         return socket.isShutdown();
558     }
559 
560     @Override
561     public ChannelFuture shutdownOutput() {
562         return shutdownOutput(newPromise());
563     }
564 
565     @Override
566     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
567         EventLoop loop = eventLoop();
568         if (loop.inEventLoop()) {
569             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
570         } else {
571             loop.execute(new Runnable() {
572                 @Override
573                 public void run() {
574                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
575                 }
576             });
577         }
578 
579         return promise;
580     }
581 
582     @Override
583     public ChannelFuture shutdownInput() {
584         return shutdownInput(newPromise());
585     }
586 
587     @Override
588     public ChannelFuture shutdownInput(final ChannelPromise promise) {
589         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
590         if (closeExecutor != null) {
591             closeExecutor.execute(new Runnable() {
592                 @Override
593                 public void run() {
594                     shutdownInput0(promise);
595                 }
596             });
597         } else {
598             EventLoop loop = eventLoop();
599             if (loop.inEventLoop()) {
600                 shutdownInput0(promise);
601             } else {
602                 loop.execute(new Runnable() {
603                     @Override
604                     public void run() {
605                         shutdownInput0(promise);
606                     }
607                 });
608             }
609         }
610         return promise;
611     }
612 
613     @Override
614     public ChannelFuture shutdown() {
615         return shutdown(newPromise());
616     }
617 
618     @Override
619     public ChannelFuture shutdown(final ChannelPromise promise) {
620         ChannelFuture shutdownOutputFuture = shutdownOutput();
621         if (shutdownOutputFuture.isDone()) {
622             shutdownOutputDone(shutdownOutputFuture, promise);
623         } else {
624             shutdownOutputFuture.addListener(new ChannelFutureListener() {
625                 @Override
626                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
627                     shutdownOutputDone(shutdownOutputFuture, promise);
628                 }
629             });
630         }
631         return promise;
632     }
633 
634     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
635         ChannelFuture shutdownInputFuture = shutdownInput();
636         if (shutdownInputFuture.isDone()) {
637             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
638         } else {
639             shutdownInputFuture.addListener(new ChannelFutureListener() {
640                 @Override
641                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
642                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
643                 }
644             });
645         }
646     }
647 
648     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
649                               ChannelFuture shutdownInputFuture,
650                               ChannelPromise promise) {
651         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
652         Throwable shutdownInputCause = shutdownInputFuture.cause();
653         if (shutdownOutputCause != null) {
654             if (shutdownInputCause != null) {
655                 logger.debug("Exception suppressed because a previous exception occurred.",
656                         shutdownInputCause);
657             }
658             promise.setFailure(shutdownOutputCause);
659         } else if (shutdownInputCause != null) {
660             promise.setFailure(shutdownInputCause);
661         } else {
662             promise.setSuccess();
663         }
664     }
665 
666     @Override
667     protected void doClose() throws Exception {
668         try {
669             // Calling super.doClose() first so spliceTo(...) will fail on next call.
670             super.doClose();
671         } finally {
672             safeClosePipe(pipeIn);
673             safeClosePipe(pipeOut);
674             clearSpliceQueue(null);
675         }
676     }
677 
678     private void clearSpliceQueue(ClosedChannelException exception) {
679         Queue<SpliceInTask> sQueue = spliceQueue;
680         if (sQueue == null) {
681             return;
682         }
683         for (;;) {
684             SpliceInTask task = sQueue.poll();
685             if (task == null) {
686                 break;
687             }
688             if (exception == null) {
689                 exception = new ClosedChannelException();
690             }
691             task.promise.tryFailure(exception);
692         }
693     }
694 
695     private static void safeClosePipe(FileDescriptor fd) {
696         if (fd != null) {
697             try {
698                 fd.close();
699             } catch (IOException e) {
700                 logger.warn("Error while closing a pipe", e);
701             }
702         }
703     }
704 
705     class EpollStreamUnsafe extends AbstractEpollUnsafe {
706         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
707         @Override
708         protected Executor prepareToClose() {
709             return super.prepareToClose();
710         }
711 
712         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause,
713                                          boolean allDataRead, EpollRecvByteAllocatorHandle allocHandle) {
714             if (byteBuf != null) {
715                 if (byteBuf.isReadable()) {
716                     readPending = false;
717                     pipeline.fireChannelRead(byteBuf);
718                 } else {
719                     byteBuf.release();
720                 }
721             }
722             allocHandle.readComplete();
723             pipeline.fireChannelReadComplete();
724             pipeline.fireExceptionCaught(cause);
725 
726             // If oom will close the read event, release connection.
727             // See https://github.com/netty/netty/issues/10434
728             if (allDataRead || cause instanceof OutOfMemoryError || cause instanceof IOException) {
729                 shutdownInput(true);
730             }
731         }
732 
733         @Override
734         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
735             return new EpollRecvByteAllocatorStreamingHandle(handle);
736         }
737 
738         @Override
739         void epollInReady() {
740             final ChannelConfig config = config();
741             if (shouldBreakEpollInReady(config)) {
742                 clearEpollIn0();
743                 return;
744             }
745             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
746             final ChannelPipeline pipeline = pipeline();
747             final ByteBufAllocator allocator = config.getAllocator();
748             allocHandle.reset(config);
749 
750             ByteBuf byteBuf = null;
751             boolean allDataRead = false;
752             Queue<SpliceInTask> sQueue = null;
753             try {
754                 do {
755                     if (sQueue != null || (sQueue = spliceQueue) != null) {
756                         SpliceInTask spliceTask = sQueue.peek();
757                         if (spliceTask != null) {
758                             boolean spliceInResult = spliceTask.spliceIn(allocHandle);
759 
760                             if (allocHandle.isReceivedRdHup()) {
761                                 shutdownInput(false);
762                             }
763                             if (spliceInResult) {
764                                 // We need to check if it is still active as if not we removed all SpliceTasks in
765                                 // doClose(...)
766                                 if (isActive()) {
767                                     sQueue.remove();
768                                 }
769                                 continue;
770                             } else {
771                                 break;
772                             }
773                         }
774                     }
775 
776                     // we use a direct buffer here as the native implementations only be able
777                     // to handle direct buffers.
778                     byteBuf = allocHandle.allocate(allocator);
779                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
780                     if (allocHandle.lastBytesRead() <= 0) {
781                         // nothing was read, release the buffer.
782                         byteBuf.release();
783                         byteBuf = null;
784                         allDataRead = allocHandle.lastBytesRead() < 0;
785                         if (allDataRead) {
786                             // There is nothing left to read as we received an EOF.
787                             readPending = false;
788                         }
789                         break;
790                     }
791                     allocHandle.incMessagesRead(1);
792                     readPending = false;
793                     pipeline.fireChannelRead(byteBuf);
794                     byteBuf = null;
795 
796                     if (shouldBreakEpollInReady(config)) {
797                         // We need to do this for two reasons:
798                         //
799                         // - If the input was shutdown in between (which may be the case when the user did it in the
800                         //   fireChannelRead(...) method we should not try to read again to not produce any
801                         //   miss-leading exceptions.
802                         //
803                         // - If the user closes the channel we need to ensure we not try to read from it again as
804                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
805                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
806                         //   reading data from a filedescriptor that belongs to another socket then the socket that
807                         //   was "wrapped" by this Channel implementation.
808                         break;
809                     }
810                 } while (allocHandle.continueReading());
811 
812                 allocHandle.readComplete();
813                 pipeline.fireChannelReadComplete();
814 
815                 if (allDataRead) {
816                     shutdownInput(true);
817                 }
818             } catch (Throwable t) {
819                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
820             } finally {
821                 if (sQueue == null) {
822                     if (shouldStopReading(config)) {
823                         clearEpollIn();
824                     }
825                 } else {
826                     if (!config.isAutoRead()) {
827                         clearEpollIn();
828                     }
829                 }
830             }
831         }
832     }
833 
834     private void addToSpliceQueue(final SpliceInTask task) {
835         Queue<SpliceInTask> sQueue = spliceQueue;
836         if (sQueue == null) {
837             synchronized (this) {
838                 sQueue = spliceQueue;
839                 if (sQueue == null) {
840                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
841                 }
842             }
843         }
844         sQueue.add(task);
845     }
846 
847     protected abstract class SpliceInTask {
848         final ChannelPromise promise;
849         int len;
850 
851         protected SpliceInTask(int len, ChannelPromise promise) {
852             this.promise = promise;
853             this.len = len;
854         }
855 
856         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
857 
858         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
859             // calculate the maximum amount of data we are allowed to splice
860             int length = Math.min(handle.guess(), len);
861             int splicedIn = 0;
862             for (;;) {
863                 // Splicing until there is nothing left to splice.
864                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
865                 handle.lastBytesRead(localSplicedIn);
866                 if (localSplicedIn == 0) {
867                     break;
868                 }
869                 splicedIn += localSplicedIn;
870                 length -= localSplicedIn;
871             }
872 
873             return splicedIn;
874         }
875     }
876 
877     // Let it directly implement channelFutureListener as well to reduce object creation.
878     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
879         private final AbstractEpollStreamChannel ch;
880 
881         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
882             super(len, promise);
883             this.ch = ch;
884         }
885 
886         @Override
887         public void operationComplete(ChannelFuture future) throws Exception {
888             if (!future.isSuccess()) {
889                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
890                 promise.tryFailure(future.cause());
891             }
892         }
893 
894         @Override
895         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
896             assert ch.eventLoop().inEventLoop();
897             if (len == 0) {
898                 // Use trySuccess() as the promise might already be closed by spliceTo(...)
899                 promise.trySuccess();
900                 return true;
901             }
902             try {
903                 // We create the pipe on the target channel as this will allow us to just handle pending writes
904                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
905                 // on multiple Channels pointing to one target Channel.
906                 FileDescriptor pipeOut = ch.pipeOut;
907                 if (pipeOut == null) {
908                     // Create a new pipe as non was created before.
909                     FileDescriptor[] pipe = pipe();
910                     ch.pipeIn = pipe[0];
911                     pipeOut = ch.pipeOut = pipe[1];
912                 }
913 
914                 int splicedIn = spliceIn(pipeOut, handle);
915                 if (splicedIn > 0) {
916                     // Integer.MAX_VALUE is a special value which will result in splice forever.
917                     if (len != Integer.MAX_VALUE) {
918                         len -= splicedIn;
919                     }
920 
921                     // Depending on if we are done with splicing inbound data we set the right promise for the
922                     // outbound splicing.
923                     final ChannelPromise splicePromise;
924                     if (len == 0) {
925                         splicePromise = promise;
926                     } else {
927                         splicePromise = ch.newPromise().addListener(this);
928                     }
929 
930                     boolean autoRead = config().isAutoRead();
931 
932                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
933                     // case.
934                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
935                     ch.unsafe().flush();
936                     if (autoRead && !splicePromise.isDone()) {
937                         // Write was not done which means the target channel was not writable. In this case we need to
938                         // disable reading until we are done with splicing to the target channel because:
939                         //
940                         // - The user may want to trigger another splice operation once the splicing was complete.
941                         config().setAutoRead(false);
942                     }
943                 }
944 
945                 return len == 0;
946             } catch (Throwable cause) {
947                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
948                 promise.tryFailure(cause);
949                 return true;
950             }
951         }
952     }
953 
954     private final class SpliceOutTask {
955         private final AbstractEpollStreamChannel ch;
956         private final boolean autoRead;
957         private int len;
958 
959         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
960             this.ch = ch;
961             this.len = len;
962             this.autoRead = autoRead;
963         }
964 
965         public boolean spliceOut() throws Exception {
966             assert ch.eventLoop().inEventLoop();
967             try {
968                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
969                 len -= splicedOut;
970                 if (len == 0) {
971                     if (autoRead) {
972                         // AutoRead was used and we spliced everything so start reading again
973                         config().setAutoRead(true);
974                     }
975                     return true;
976                 }
977                 return false;
978             } catch (IOException e) {
979                 if (autoRead) {
980                     // AutoRead was used and we spliced everything so start reading again
981                     config().setAutoRead(true);
982                 }
983                 throw e;
984             }
985         }
986     }
987 
988     private final class SpliceFdTask extends SpliceInTask {
989         private final FileDescriptor fd;
990         private final ChannelPromise promise;
991         private int offset;
992 
993         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
994             super(len, promise);
995             this.fd = fd;
996             this.promise = promise;
997             this.offset = offset;
998         }
999 
1000         @Override
1001         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1002             assert eventLoop().inEventLoop();
1003             if (len == 0) {
1004                 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1005                 promise.trySuccess();
1006                 return true;
1007             }
1008 
1009             try {
1010                 FileDescriptor[] pipe = pipe();
1011                 FileDescriptor pipeIn = pipe[0];
1012                 FileDescriptor pipeOut = pipe[1];
1013                 try {
1014                     int splicedIn = spliceIn(pipeOut, handle);
1015                     if (splicedIn > 0) {
1016                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1017                         if (len != Integer.MAX_VALUE) {
1018                             len -= splicedIn;
1019                         }
1020                         do {
1021                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1022                             offset += splicedOut;
1023                             splicedIn -= splicedOut;
1024                         } while (splicedIn > 0);
1025                         if (len == 0) {
1026                             // Use trySuccess() as the promise might already be failed by spliceTo(...)
1027                             promise.trySuccess();
1028                             return true;
1029                         }
1030                     }
1031                     return false;
1032                 } finally {
1033                     safeClosePipe(pipeIn);
1034                     safeClosePipe(pipeOut);
1035                 }
1036             } catch (Throwable cause) {
1037                 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1038                 promise.tryFailure(cause);
1039                 return true;
1040             }
1041         }
1042     }
1043 
1044     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1045         EpollSocketWritableByteChannel() {
1046             super(socket);
1047             assert fd == socket;
1048         }
1049 
1050         @Override
1051         protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1052             return socket.send(buf, pos, limit);
1053         }
1054 
1055         @Override
1056         protected ByteBufAllocator alloc() {
1057             return AbstractEpollStreamChannel.this.alloc();
1058         }
1059     }
1060 }