View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.ClosedChannelException;
47  import java.nio.channels.WritableByteChannel;
48  import java.util.Queue;
49  import java.util.concurrent.Executor;
50  
51  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53  import static io.netty.channel.unix.FileDescriptor.pipe;
54  import static io.netty.util.internal.ObjectUtil.checkNotNull;
55  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56  import static io.netty.util.internal.StringUtil.className;
57  
58  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60      private static final String EXPECTED_TYPES =
61              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64  
65      private final Runnable flushTask = new Runnable() {
66          @Override
67          public void run() {
68              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
69              // meantime.
70              ((AbstractEpollUnsafe) unsafe()).flush0();
71          }
72      };
73  
74      // Lazy init these if we need to splice(...)
75      private volatile Queue<SpliceInTask> spliceQueue;
76      private FileDescriptor pipeIn;
77      private FileDescriptor pipeOut;
78  
79      private WritableByteChannel byteChannel;
80  
81      protected AbstractEpollStreamChannel(Channel parent, int fd) {
82          this(parent, new LinuxSocket(fd));
83      }
84  
85      protected AbstractEpollStreamChannel(int fd) {
86          this(new LinuxSocket(fd));
87      }
88  
89      AbstractEpollStreamChannel(LinuxSocket fd) {
90          this(fd, isSoErrorZero(fd));
91      }
92  
93      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
95          super(parent, fd, true, EpollIoOps.EPOLLRDHUP);
96      }
97  
98      protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
99          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
100         super(parent, fd, remote, EpollIoOps.EPOLLRDHUP);
101     }
102 
103     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
104         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
105         super(null, fd, active, EpollIoOps.EPOLLRDHUP);
106     }
107 
108     @Override
109     protected AbstractEpollUnsafe newUnsafe() {
110         return new EpollStreamUnsafe();
111     }
112 
113     @Override
114     public ChannelMetadata metadata() {
115         return METADATA;
116     }
117 
118     /**
119      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
120      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
121      * splice until the {@link ChannelFuture} was canceled or it was failed.
122      *
123      * Please note:
124      * <ul>
125      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
126      *   {@link IllegalArgumentException} is thrown. </li>
127      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
128      *   target {@link AbstractEpollStreamChannel}</li>
129      * </ul>
130      *
131      */
132     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
133         return spliceTo(ch, len, newPromise());
134     }
135 
136     /**
137      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
138      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
139      * splice until the {@link ChannelFuture} was canceled or it was failed.
140      *
141      * Please note:
142      * <ul>
143      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
144      *   {@link IllegalArgumentException} is thrown. </li>
145      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
146      *   target {@link AbstractEpollStreamChannel}</li>
147      * </ul>
148      *
149      */
150     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
151                                         final ChannelPromise promise) {
152         if (ch.eventLoop() != eventLoop()) {
153             throw new IllegalArgumentException("EventLoops are not the same.");
154         }
155         checkPositiveOrZero(len, "len");
156         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
157                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
158             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
159         }
160         checkNotNull(promise, "promise");
161         if (!isOpen()) {
162             promise.tryFailure(new ClosedChannelException());
163         } else {
164             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
165             failSpliceIfClosed(promise);
166         }
167         return promise;
168     }
169 
170     /**
171      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
172      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
173      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
174      * {@link ChannelFuture} was canceled or it was failed.
175      *
176      * Please note:
177      * <ul>
178      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
179      *   {@link AbstractEpollStreamChannel}</li>
180      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
181      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
182      * </ul>
183      */
184     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
185         return spliceTo(ch, offset, len, newPromise());
186     }
187 
188     /**
189      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
190      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
191      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
192      * {@link ChannelFuture} was canceled or it was failed.
193      *
194      * Please note:
195      * <ul>
196      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
197      *   {@link AbstractEpollStreamChannel}</li>
198      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
199      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
200      * </ul>
201      */
202     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
203                                         final ChannelPromise promise) {
204         checkPositiveOrZero(len, "len");
205         checkPositiveOrZero(offset, "offset");
206         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
207             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
208         }
209         checkNotNull(promise, "promise");
210         if (!isOpen()) {
211             promise.tryFailure(new ClosedChannelException());
212         } else {
213             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
214             failSpliceIfClosed(promise);
215         }
216         return promise;
217     }
218 
219     private void failSpliceIfClosed(ChannelPromise promise) {
220         if (!isOpen()) {
221             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
222             // cases where a future may not be notified otherwise.
223             if (!promise.isDone()) {
224                 final ClosedChannelException ex = new ClosedChannelException();
225                 if (promise.tryFailure(ex)) {
226                     eventLoop().execute(new Runnable() {
227                         @Override
228                         public void run() {
229                             // Call this via the EventLoop as it is a MPSC queue.
230                             clearSpliceQueue(ex);
231                         }
232                     });
233                 }
234             }
235         }
236     }
237 
238     /**
239      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
240      * @param in the collection which contains objects to write.
241      * @param buf the {@link ByteBuf} from which the bytes should be written
242      * @return The value that should be decremented from the write quantum which starts at
243      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
244      * <ul>
245      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
246      *     is encountered</li>
247      *     <li>1 - if a single call to write data was made to the OS</li>
248      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
249      *     no data was accepted</li>
250      * </ul>
251      */
252     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
253         int readableBytes = buf.readableBytes();
254         if (readableBytes == 0) {
255             in.remove();
256             return 0;
257         }
258 
259         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
260             return doWriteBytes(in, buf);
261         } else {
262             ByteBuffer[] nioBuffers = buf.nioBuffers();
263             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
264                     config().getMaxBytesPerGatheringWrite());
265         }
266     }
267 
268     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
269         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
270         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
271         // make a best effort to adjust as OS behavior changes.
272         if (attempted == written) {
273             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
274                 config().setMaxBytesPerGatheringWrite(attempted << 1);
275             }
276         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
277             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
278         }
279     }
280 
281     /**
282      * Write multiple bytes via {@link IovArray}.
283      * @param in the collection which contains objects to write.
284      * @param array The array which contains the content to write.
285      * @return The value that should be decremented from the write quantum which starts at
286      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
287      * <ul>
288      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
289      *     is encountered</li>
290      *     <li>1 - if a single call to write data was made to the OS</li>
291      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
292      *     no data was accepted</li>
293      * </ul>
294      * @throws IOException If an I/O exception occurs during write.
295      */
296     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
297         final long expectedWrittenBytes = array.size();
298         assert expectedWrittenBytes != 0;
299         final int cnt = array.count();
300         assert cnt != 0;
301 
302         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
303         if (localWrittenBytes > 0) {
304             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
305             in.removeBytes(localWrittenBytes);
306             return 1;
307         }
308         return WRITE_STATUS_SNDBUF_FULL;
309     }
310 
311     /**
312      * Write multiple bytes via {@link ByteBuffer} array.
313      * @param in the collection which contains objects to write.
314      * @param nioBuffers The buffers to write.
315      * @param nioBufferCnt The number of buffers to write.
316      * @param expectedWrittenBytes The number of bytes we expect to write.
317      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
318      * @return The value that should be decremented from the write quantum which starts at
319      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
320      * <ul>
321      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
322      *     is encountered</li>
323      *     <li>1 - if a single call to write data was made to the OS</li>
324      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
325      *     no data was accepted</li>
326      * </ul>
327      * @throws IOException If an I/O exception occurs during write.
328      */
329     private int writeBytesMultiple(
330             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
331             long maxBytesPerGatheringWrite) throws IOException {
332         assert expectedWrittenBytes != 0;
333         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
334             expectedWrittenBytes = maxBytesPerGatheringWrite;
335         }
336 
337         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
338         if (localWrittenBytes > 0) {
339             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
340             in.removeBytes(localWrittenBytes);
341             return 1;
342         }
343         return WRITE_STATUS_SNDBUF_FULL;
344     }
345 
346     /**
347      * Write a {@link DefaultFileRegion}
348      * @param in the collection which contains objects to write.
349      * @param region the {@link DefaultFileRegion} from which the bytes should be written
350      * @return The value that should be decremented from the write quantum which starts at
351      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
352      * <ul>
353      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
354      *     is encountered</li>
355      *     <li>1 - if a single call to write data was made to the OS</li>
356      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
357      *     no data was accepted</li>
358      * </ul>
359      */
360     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
361         final long offset = region.transferred();
362         final long regionCount = region.count();
363         if (offset >= regionCount) {
364             in.remove();
365             return 0;
366         }
367 
368         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
369         if (flushedAmount > 0) {
370             in.progress(flushedAmount);
371             if (region.transferred() >= regionCount) {
372                 in.remove();
373             }
374             return 1;
375         } else if (flushedAmount == 0) {
376             validateFileRegion(region, offset);
377         }
378         return WRITE_STATUS_SNDBUF_FULL;
379     }
380 
381     /**
382      * Write a {@link FileRegion}
383      * @param in the collection which contains objects to write.
384      * @param region the {@link FileRegion} from which the bytes should be written
385      * @return The value that should be decremented from the write quantum which starts at
386      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
387      * <ul>
388      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
389      *     is encountered</li>
390      *     <li>1 - if a single call to write data was made to the OS</li>
391      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
392      *     no data was accepted</li>
393      * </ul>
394      */
395     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
396         if (region.transferred() >= region.count()) {
397             in.remove();
398             return 0;
399         }
400 
401         if (byteChannel == null) {
402             byteChannel = new EpollSocketWritableByteChannel();
403         }
404         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
405         if (flushedAmount > 0) {
406             in.progress(flushedAmount);
407             if (region.transferred() >= region.count()) {
408                 in.remove();
409             }
410             return 1;
411         }
412         return WRITE_STATUS_SNDBUF_FULL;
413     }
414 
415     @Override
416     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
417         int writeSpinCount = config().getWriteSpinCount();
418         do {
419             final int msgCount = in.size();
420             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
421             if (msgCount > 1 && in.current() instanceof ByteBuf) {
422                 writeSpinCount -= doWriteMultiple(in);
423             } else if (msgCount == 0) {
424                 // Wrote all messages.
425                 clearFlag(Native.EPOLLOUT);
426                 // Return here so we not set the EPOLLOUT flag.
427                 return;
428             } else {  // msgCount == 1
429                 writeSpinCount -= doWriteSingle(in);
430             }
431 
432             // We do not break the loop here even if the outbound buffer was flushed completely,
433             // because a user might have triggered another write and flush when we notify his or her
434             // listeners.
435         } while (writeSpinCount > 0);
436 
437         if (writeSpinCount == 0) {
438             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
439             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
440             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
441             // and set the EPOLLOUT if necessary.
442             clearFlag(Native.EPOLLOUT);
443 
444             // We used our writeSpin quantum, and should try to write again later.
445             eventLoop().execute(flushTask);
446         } else {
447             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
448             // when it can accept more data.
449             setFlag(Native.EPOLLOUT);
450         }
451     }
452 
453     /**
454      * Attempt to write a single object.
455      * @param in the collection which contains objects to write.
456      * @return The value that should be decremented from the write quantum which starts at
457      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
458      * <ul>
459      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
460      *     is encountered</li>
461      *     <li>1 - if a single call to write data was made to the OS</li>
462      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
463      *     no data was accepted</li>
464      * </ul>
465      * @throws Exception If an I/O error occurs.
466      */
467     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
468         // The outbound buffer contains only one message or it contains a file region.
469         Object msg = in.current();
470         if (msg instanceof ByteBuf) {
471             return writeBytes(in, (ByteBuf) msg);
472         } else if (msg instanceof DefaultFileRegion) {
473             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
474         } else if (msg instanceof FileRegion) {
475             return writeFileRegion(in, (FileRegion) msg);
476         } else if (msg instanceof SpliceOutTask) {
477             if (!((SpliceOutTask) msg).spliceOut()) {
478                 return WRITE_STATUS_SNDBUF_FULL;
479             }
480             in.remove();
481             return 1;
482         } else {
483             // Should never reach here.
484             throw new Error("Unexpected message type: " + className(msg));
485         }
486     }
487 
488     /**
489      * Attempt to write multiple {@link ByteBuf} objects.
490      * @param in the collection which contains objects to write.
491      * @return The value that should be decremented from the write quantum which starts at
492      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
493      * <ul>
494      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
495      *     is encountered</li>
496      *     <li>1 - if a single call to write data was made to the OS</li>
497      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
498      *     no data was accepted</li>
499      * </ul>
500      * @throws Exception If an I/O error occurs.
501      */
502     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
503         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
504         IovArray array =  ((NativeArrays) registration().attachment()).cleanIovArray();
505         array.maxBytes(maxBytesPerGatheringWrite);
506         in.forEachFlushedMessage(array);
507 
508         if (array.count() >= 1) {
509             // TODO: Handle the case where cnt == 1 specially.
510             return writeBytesMultiple(in, array);
511         }
512         // cnt == 0, which means the outbound buffer contained empty buffers only.
513         in.removeBytes(0);
514         return 0;
515     }
516 
517     @Override
518     protected Object filterOutboundMessage(Object msg) {
519         if (msg instanceof ByteBuf) {
520             ByteBuf buf = (ByteBuf) msg;
521             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
522         }
523 
524         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
525             return msg;
526         }
527 
528         throw new UnsupportedOperationException(
529                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
530     }
531 
532     @Override
533     protected final void doShutdownOutput() throws Exception {
534         socket.shutdown(false, true);
535     }
536 
537     private void shutdownInput0(final ChannelPromise promise) {
538         try {
539             socket.shutdown(true, false);
540             promise.setSuccess();
541         } catch (Throwable cause) {
542             promise.setFailure(cause);
543         }
544     }
545 
546     @Override
547     public boolean isOutputShutdown() {
548         return socket.isOutputShutdown();
549     }
550 
551     @Override
552     public boolean isInputShutdown() {
553         return socket.isInputShutdown();
554     }
555 
556     @Override
557     public boolean isShutdown() {
558         return socket.isShutdown();
559     }
560 
561     @Override
562     public ChannelFuture shutdownOutput() {
563         return shutdownOutput(newPromise());
564     }
565 
566     @Override
567     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
568         EventLoop loop = eventLoop();
569         if (loop.inEventLoop()) {
570             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
571         } else {
572             loop.execute(new Runnable() {
573                 @Override
574                 public void run() {
575                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
576                 }
577             });
578         }
579 
580         return promise;
581     }
582 
583     @Override
584     public ChannelFuture shutdownInput() {
585         return shutdownInput(newPromise());
586     }
587 
588     @Override
589     public ChannelFuture shutdownInput(final ChannelPromise promise) {
590         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
591         if (closeExecutor != null) {
592             closeExecutor.execute(new Runnable() {
593                 @Override
594                 public void run() {
595                     shutdownInput0(promise);
596                 }
597             });
598         } else {
599             EventLoop loop = eventLoop();
600             if (loop.inEventLoop()) {
601                 shutdownInput0(promise);
602             } else {
603                 loop.execute(new Runnable() {
604                     @Override
605                     public void run() {
606                         shutdownInput0(promise);
607                     }
608                 });
609             }
610         }
611         return promise;
612     }
613 
614     @Override
615     public ChannelFuture shutdown() {
616         return shutdown(newPromise());
617     }
618 
619     @Override
620     public ChannelFuture shutdown(final ChannelPromise promise) {
621         ChannelFuture shutdownOutputFuture = shutdownOutput();
622         if (shutdownOutputFuture.isDone()) {
623             shutdownOutputDone(shutdownOutputFuture, promise);
624         } else {
625             shutdownOutputFuture.addListener(new ChannelFutureListener() {
626                 @Override
627                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
628                     shutdownOutputDone(shutdownOutputFuture, promise);
629                 }
630             });
631         }
632         return promise;
633     }
634 
635     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
636         ChannelFuture shutdownInputFuture = shutdownInput();
637         if (shutdownInputFuture.isDone()) {
638             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
639         } else {
640             shutdownInputFuture.addListener(new ChannelFutureListener() {
641                 @Override
642                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
643                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
644                 }
645             });
646         }
647     }
648 
649     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
650                               ChannelFuture shutdownInputFuture,
651                               ChannelPromise promise) {
652         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
653         Throwable shutdownInputCause = shutdownInputFuture.cause();
654         if (shutdownOutputCause != null) {
655             if (shutdownInputCause != null) {
656                 logger.debug("Exception suppressed because a previous exception occurred.",
657                         shutdownInputCause);
658             }
659             promise.setFailure(shutdownOutputCause);
660         } else if (shutdownInputCause != null) {
661             promise.setFailure(shutdownInputCause);
662         } else {
663             promise.setSuccess();
664         }
665     }
666 
667     @Override
668     protected void doClose() throws Exception {
669         try {
670             // Calling super.doClose() first so spliceTo(...) will fail on next call.
671             super.doClose();
672         } finally {
673             safeClosePipe(pipeIn);
674             safeClosePipe(pipeOut);
675             clearSpliceQueue(null);
676         }
677     }
678 
679     private void clearSpliceQueue(ClosedChannelException exception) {
680         Queue<SpliceInTask> sQueue = spliceQueue;
681         if (sQueue == null) {
682             return;
683         }
684         for (;;) {
685             SpliceInTask task = sQueue.poll();
686             if (task == null) {
687                 break;
688             }
689             if (exception == null) {
690                 exception = new ClosedChannelException();
691             }
692             task.promise.tryFailure(exception);
693         }
694     }
695 
696     private static void safeClosePipe(FileDescriptor fd) {
697         if (fd != null) {
698             try {
699                 fd.close();
700             } catch (IOException e) {
701                 logger.warn("Error while closing a pipe", e);
702             }
703         }
704     }
705 
706     class EpollStreamUnsafe extends AbstractEpollUnsafe {
707         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
708         @Override
709         protected Executor prepareToClose() {
710             return super.prepareToClose();
711         }
712 
713         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause,
714                                          boolean allDataRead, EpollRecvByteAllocatorHandle allocHandle) {
715             if (byteBuf != null) {
716                 if (byteBuf.isReadable()) {
717                     readPending = false;
718                     pipeline.fireChannelRead(byteBuf);
719                 } else {
720                     byteBuf.release();
721                 }
722             }
723             allocHandle.readComplete();
724             pipeline.fireChannelReadComplete();
725             pipeline.fireExceptionCaught(cause);
726 
727             // If oom will close the read event, release connection.
728             // See https://github.com/netty/netty/issues/10434
729             if (allDataRead || cause instanceof OutOfMemoryError || cause instanceof IOException) {
730                 shutdownInput(true);
731             }
732         }
733 
734         @Override
735         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
736             return new EpollRecvByteAllocatorStreamingHandle(handle);
737         }
738 
739         @Override
740         void epollInReady() {
741             final ChannelConfig config = config();
742             if (shouldBreakEpollInReady(config)) {
743                 clearEpollIn0();
744                 return;
745             }
746             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
747             final ChannelPipeline pipeline = pipeline();
748             final ByteBufAllocator allocator = config.getAllocator();
749             allocHandle.reset(config);
750 
751             ByteBuf byteBuf = null;
752             boolean allDataRead = false;
753             Queue<SpliceInTask> sQueue = null;
754             try {
755                 do {
756                     if (sQueue != null || (sQueue = spliceQueue) != null) {
757                         SpliceInTask spliceTask = sQueue.peek();
758                         if (spliceTask != null) {
759                             boolean spliceInResult = spliceTask.spliceIn(allocHandle);
760 
761                             if (allocHandle.isReceivedRdHup()) {
762                                 shutdownInput(false);
763                             }
764                             if (spliceInResult) {
765                                 // We need to check if it is still active as if not we removed all SpliceTasks in
766                                 // doClose(...)
767                                 if (isActive()) {
768                                     sQueue.remove();
769                                 }
770                                 continue;
771                             } else {
772                                 break;
773                             }
774                         }
775                     }
776 
777                     // we use a direct buffer here as the native implementations only be able
778                     // to handle direct buffers.
779                     byteBuf = allocHandle.allocate(allocator);
780                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
781                     if (allocHandle.lastBytesRead() <= 0) {
782                         // nothing was read, release the buffer.
783                         byteBuf.release();
784                         byteBuf = null;
785                         allDataRead = allocHandle.lastBytesRead() < 0;
786                         if (allDataRead) {
787                             // There is nothing left to read as we received an EOF.
788                             readPending = false;
789                         }
790                         break;
791                     }
792                     allocHandle.incMessagesRead(1);
793                     readPending = false;
794                     pipeline.fireChannelRead(byteBuf);
795                     byteBuf = null;
796 
797                     if (shouldBreakEpollInReady(config)) {
798                         // We need to do this for two reasons:
799                         //
800                         // - If the input was shutdown in between (which may be the case when the user did it in the
801                         //   fireChannelRead(...) method we should not try to read again to not produce any
802                         //   miss-leading exceptions.
803                         //
804                         // - If the user closes the channel we need to ensure we not try to read from it again as
805                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
806                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
807                         //   reading data from a filedescriptor that belongs to another socket then the socket that
808                         //   was "wrapped" by this Channel implementation.
809                         break;
810                     }
811                 } while (allocHandle.continueReading());
812 
813                 allocHandle.readComplete();
814                 pipeline.fireChannelReadComplete();
815 
816                 if (allDataRead) {
817                     shutdownInput(true);
818                 }
819             } catch (Throwable t) {
820                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
821             } finally {
822                 if (sQueue == null) {
823                     if (shouldStopReading(config)) {
824                         clearEpollIn();
825                     }
826                 } else {
827                     if (!config.isAutoRead()) {
828                         clearEpollIn();
829                     }
830                 }
831             }
832         }
833     }
834 
835     private void addToSpliceQueue(final SpliceInTask task) {
836         Queue<SpliceInTask> sQueue = spliceQueue;
837         if (sQueue == null) {
838             synchronized (this) {
839                 sQueue = spliceQueue;
840                 if (sQueue == null) {
841                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
842                 }
843             }
844         }
845         sQueue.add(task);
846     }
847 
848     protected abstract class SpliceInTask {
849         final ChannelPromise promise;
850         int len;
851 
852         protected SpliceInTask(int len, ChannelPromise promise) {
853             this.promise = promise;
854             this.len = len;
855         }
856 
857         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
858 
859         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
860             // calculate the maximum amount of data we are allowed to splice
861             int length = Math.min(handle.guess(), len);
862             int splicedIn = 0;
863             for (;;) {
864                 // Splicing until there is nothing left to splice.
865                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
866                 handle.lastBytesRead(localSplicedIn);
867                 if (localSplicedIn == 0) {
868                     break;
869                 }
870                 splicedIn += localSplicedIn;
871                 length -= localSplicedIn;
872             }
873 
874             return splicedIn;
875         }
876     }
877 
878     // Let it directly implement channelFutureListener as well to reduce object creation.
879     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
880         private final AbstractEpollStreamChannel ch;
881 
882         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
883             super(len, promise);
884             this.ch = ch;
885         }
886 
887         @Override
888         public void operationComplete(ChannelFuture future) throws Exception {
889             if (!future.isSuccess()) {
890                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
891                 promise.tryFailure(future.cause());
892             }
893         }
894 
895         @Override
896         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
897             assert ch.eventLoop().inEventLoop();
898             if (len == 0) {
899                 // Use trySuccess() as the promise might already be closed by spliceTo(...)
900                 promise.trySuccess();
901                 return true;
902             }
903             try {
904                 // We create the pipe on the target channel as this will allow us to just handle pending writes
905                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
906                 // on multiple Channels pointing to one target Channel.
907                 FileDescriptor pipeOut = ch.pipeOut;
908                 if (pipeOut == null) {
909                     // Create a new pipe as non was created before.
910                     FileDescriptor[] pipe = pipe();
911                     ch.pipeIn = pipe[0];
912                     pipeOut = ch.pipeOut = pipe[1];
913                 }
914 
915                 int splicedIn = spliceIn(pipeOut, handle);
916                 if (splicedIn > 0) {
917                     // Integer.MAX_VALUE is a special value which will result in splice forever.
918                     if (len != Integer.MAX_VALUE) {
919                         len -= splicedIn;
920                     }
921 
922                     // Depending on if we are done with splicing inbound data we set the right promise for the
923                     // outbound splicing.
924                     final ChannelPromise splicePromise;
925                     if (len == 0) {
926                         splicePromise = promise;
927                     } else {
928                         splicePromise = ch.newPromise().addListener(this);
929                     }
930 
931                     boolean autoRead = config().isAutoRead();
932 
933                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
934                     // case.
935                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
936                     ch.unsafe().flush();
937                     if (autoRead && !splicePromise.isDone()) {
938                         // Write was not done which means the target channel was not writable. In this case we need to
939                         // disable reading until we are done with splicing to the target channel because:
940                         //
941                         // - The user may want to trigger another splice operation once the splicing was complete.
942                         config().setAutoRead(false);
943                     }
944                 }
945 
946                 return len == 0;
947             } catch (Throwable cause) {
948                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
949                 promise.tryFailure(cause);
950                 return true;
951             }
952         }
953     }
954 
955     private final class SpliceOutTask {
956         private final AbstractEpollStreamChannel ch;
957         private final boolean autoRead;
958         private int len;
959 
960         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
961             this.ch = ch;
962             this.len = len;
963             this.autoRead = autoRead;
964         }
965 
966         public boolean spliceOut() throws Exception {
967             assert ch.eventLoop().inEventLoop();
968             try {
969                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
970                 len -= splicedOut;
971                 if (len == 0) {
972                     if (autoRead) {
973                         // AutoRead was used and we spliced everything so start reading again
974                         config().setAutoRead(true);
975                     }
976                     return true;
977                 }
978                 return false;
979             } catch (IOException e) {
980                 if (autoRead) {
981                     // AutoRead was used and we spliced everything so start reading again
982                     config().setAutoRead(true);
983                 }
984                 throw e;
985             }
986         }
987     }
988 
989     private final class SpliceFdTask extends SpliceInTask {
990         private final FileDescriptor fd;
991         private final ChannelPromise promise;
992         private int offset;
993 
994         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
995             super(len, promise);
996             this.fd = fd;
997             this.promise = promise;
998             this.offset = offset;
999         }
1000 
1001         @Override
1002         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1003             assert eventLoop().inEventLoop();
1004             if (len == 0) {
1005                 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1006                 promise.trySuccess();
1007                 return true;
1008             }
1009 
1010             try {
1011                 FileDescriptor[] pipe = pipe();
1012                 FileDescriptor pipeIn = pipe[0];
1013                 FileDescriptor pipeOut = pipe[1];
1014                 try {
1015                     int splicedIn = spliceIn(pipeOut, handle);
1016                     if (splicedIn > 0) {
1017                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1018                         if (len != Integer.MAX_VALUE) {
1019                             len -= splicedIn;
1020                         }
1021                         do {
1022                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1023                             offset += splicedOut;
1024                             splicedIn -= splicedOut;
1025                         } while (splicedIn > 0);
1026                         if (len == 0) {
1027                             // Use trySuccess() as the promise might already be failed by spliceTo(...)
1028                             promise.trySuccess();
1029                             return true;
1030                         }
1031                     }
1032                     return false;
1033                 } finally {
1034                     safeClosePipe(pipeIn);
1035                     safeClosePipe(pipeOut);
1036                 }
1037             } catch (Throwable cause) {
1038                 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1039                 promise.tryFailure(cause);
1040                 return true;
1041             }
1042         }
1043     }
1044 
1045     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1046         EpollSocketWritableByteChannel() {
1047             super(socket);
1048             assert fd == socket;
1049         }
1050 
1051         @Override
1052         protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1053             return socket.send(buf, pos, limit);
1054         }
1055 
1056         @Override
1057         protected ByteBufAllocator alloc() {
1058             return AbstractEpollStreamChannel.this.alloc();
1059         }
1060     }
1061 }