View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.ClosedChannelException;
47  import java.nio.channels.WritableByteChannel;
48  import java.util.Queue;
49  import java.util.concurrent.Executor;
50  
51  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
53  import static io.netty.channel.unix.FileDescriptor.pipe;
54  import static io.netty.util.internal.ObjectUtil.checkNotNull;
55  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
56  
57  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
58      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
59      private static final String EXPECTED_TYPES =
60              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
61                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
63  
64      private final Runnable flushTask = new Runnable() {
65          @Override
66          public void run() {
67              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
68              // meantime.
69              ((AbstractEpollUnsafe) unsafe()).flush0();
70          }
71      };
72  
73      // Lazy init these if we need to splice(...)
74      private volatile Queue<SpliceInTask> spliceQueue;
75      private FileDescriptor pipeIn;
76      private FileDescriptor pipeOut;
77  
78      private WritableByteChannel byteChannel;
79  
80      protected AbstractEpollStreamChannel(Channel parent, int fd) {
81          this(parent, new LinuxSocket(fd));
82      }
83  
84      protected AbstractEpollStreamChannel(int fd) {
85          this(new LinuxSocket(fd));
86      }
87  
88      AbstractEpollStreamChannel(LinuxSocket fd) {
89          this(fd, isSoErrorZero(fd));
90      }
91  
92      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
93          super(parent, fd, true);
94          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
95          flags |= Native.EPOLLRDHUP;
96      }
97  
98      protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
99          super(parent, fd, remote);
100         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
101         flags |= Native.EPOLLRDHUP;
102     }
103 
104     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
105         super(null, fd, active);
106         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
107         flags |= Native.EPOLLRDHUP;
108     }
109 
110     @Override
111     protected AbstractEpollUnsafe newUnsafe() {
112         return new EpollStreamUnsafe();
113     }
114 
115     @Override
116     public ChannelMetadata metadata() {
117         return METADATA;
118     }
119 
120     /**
121      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
122      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
123      * splice until the {@link ChannelFuture} was canceled or it was failed.
124      *
125      * Please note:
126      * <ul>
127      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
128      *   {@link IllegalArgumentException} is thrown. </li>
129      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
130      *   target {@link AbstractEpollStreamChannel}</li>
131      * </ul>
132      * @deprecated Will be removed in the future.
133      */
134     @Deprecated
135     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136         return spliceTo(ch, len, newPromise());
137     }
138 
139     /**
140      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
141      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
142      * splice until the {@link ChannelFuture} was canceled or it was failed.
143      *
144      * Please note:
145      * <ul>
146      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
147      *   {@link IllegalArgumentException} is thrown. </li>
148      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
149      *   target {@link AbstractEpollStreamChannel}</li>
150      * </ul>
151      * @deprecated will be removed in the future.
152      */
153     @Deprecated
154     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
155                                         final ChannelPromise promise) {
156         if (ch.eventLoop() != eventLoop()) {
157             throw new IllegalArgumentException("EventLoops are not the same.");
158         }
159         checkPositiveOrZero(len, "len");
160         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
161                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
162             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
163         }
164         checkNotNull(promise, "promise");
165         if (!isOpen()) {
166             promise.tryFailure(new ClosedChannelException());
167         } else {
168             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
169             failSpliceIfClosed(promise);
170         }
171         return promise;
172     }
173 
174     /**
175      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
176      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
177      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
178      * {@link ChannelFuture} was canceled or it was failed.
179      *
180      * Please note:
181      * <ul>
182      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
183      *   {@link AbstractEpollStreamChannel}</li>
184      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
185      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
186      * </ul>
187      * @deprecated Will be removed in the future.
188      */
189     @Deprecated
190     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
191         return spliceTo(ch, offset, len, newPromise());
192     }
193 
194     /**
195      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
196      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
197      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
198      * {@link ChannelFuture} was canceled or it was failed.
199      *
200      * Please note:
201      * <ul>
202      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
203      *   {@link AbstractEpollStreamChannel}</li>
204      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
205      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
206      * </ul>
207      * @deprecated Will be removed in the future.
208      */
209     @Deprecated
210     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
211                                         final ChannelPromise promise) {
212         checkPositiveOrZero(len, "len");
213         checkPositiveOrZero(offset, "offset");
214         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
215             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
216         }
217         checkNotNull(promise, "promise");
218         if (!isOpen()) {
219             promise.tryFailure(new ClosedChannelException());
220         } else {
221             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
222             failSpliceIfClosed(promise);
223         }
224         return promise;
225     }
226 
227     private void failSpliceIfClosed(ChannelPromise promise) {
228         if (!isOpen()) {
229             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
230             // cases where a future may not be notified otherwise.
231             if (!promise.isDone()) {
232                 final ClosedChannelException ex = new ClosedChannelException();
233                 if (promise.tryFailure(ex)) {
234                     eventLoop().execute(new Runnable() {
235                         @Override
236                         public void run() {
237                             // Call this via the EventLoop as it is a MPSC queue.
238                             clearSpliceQueue(ex);
239                         }
240                     });
241                 }
242             }
243         }
244     }
245 
246     /**
247      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
248      * @param in the collection which contains objects to write.
249      * @param buf the {@link ByteBuf} from which the bytes should be written
250      * @return The value that should be decremented from the write quantum which starts at
251      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
252      * <ul>
253      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
254      *     is encountered</li>
255      *     <li>1 - if a single call to write data was made to the OS</li>
256      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
257      *     no data was accepted</li>
258      * </ul>
259      */
260     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
261         int readableBytes = buf.readableBytes();
262         if (readableBytes == 0) {
263             in.remove();
264             return 0;
265         }
266 
267         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
268             return doWriteBytes(in, buf);
269         } else {
270             ByteBuffer[] nioBuffers = buf.nioBuffers();
271             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
272                     config().getMaxBytesPerGatheringWrite());
273         }
274     }
275 
276     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
277         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
278         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
279         // make a best effort to adjust as OS behavior changes.
280         if (attempted == written) {
281             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
282                 config().setMaxBytesPerGatheringWrite(attempted << 1);
283             }
284         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
285             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
286         }
287     }
288 
289     /**
290      * Write multiple bytes via {@link IovArray}.
291      * @param in the collection which contains objects to write.
292      * @param array The array which contains the content to write.
293      * @return The value that should be decremented from the write quantum which starts at
294      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
295      * <ul>
296      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
297      *     is encountered</li>
298      *     <li>1 - if a single call to write data was made to the OS</li>
299      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
300      *     no data was accepted</li>
301      * </ul>
302      * @throws IOException If an I/O exception occurs during write.
303      */
304     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
305         final long expectedWrittenBytes = array.size();
306         assert expectedWrittenBytes != 0;
307         final int cnt = array.count();
308         assert cnt != 0;
309 
310         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
311         if (localWrittenBytes > 0) {
312             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
313             in.removeBytes(localWrittenBytes);
314             return 1;
315         }
316         return WRITE_STATUS_SNDBUF_FULL;
317     }
318 
319     /**
320      * Write multiple bytes via {@link ByteBuffer} array.
321      * @param in the collection which contains objects to write.
322      * @param nioBuffers The buffers to write.
323      * @param nioBufferCnt The number of buffers to write.
324      * @param expectedWrittenBytes The number of bytes we expect to write.
325      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
326      * @return The value that should be decremented from the write quantum which starts at
327      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
328      * <ul>
329      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
330      *     is encountered</li>
331      *     <li>1 - if a single call to write data was made to the OS</li>
332      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
333      *     no data was accepted</li>
334      * </ul>
335      * @throws IOException If an I/O exception occurs during write.
336      */
337     private int writeBytesMultiple(
338             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
339             long maxBytesPerGatheringWrite) throws IOException {
340         assert expectedWrittenBytes != 0;
341         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
342             expectedWrittenBytes = maxBytesPerGatheringWrite;
343         }
344 
345         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
346         if (localWrittenBytes > 0) {
347             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
348             in.removeBytes(localWrittenBytes);
349             return 1;
350         }
351         return WRITE_STATUS_SNDBUF_FULL;
352     }
353 
354     /**
355      * Write a {@link DefaultFileRegion}
356      * @param in the collection which contains objects to write.
357      * @param region the {@link DefaultFileRegion} from which the bytes should be written
358      * @return The value that should be decremented from the write quantum which starts at
359      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
360      * <ul>
361      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
362      *     is encountered</li>
363      *     <li>1 - if a single call to write data was made to the OS</li>
364      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
365      *     no data was accepted</li>
366      * </ul>
367      */
368     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
369         final long offset = region.transferred();
370         final long regionCount = region.count();
371         if (offset >= regionCount) {
372             in.remove();
373             return 0;
374         }
375 
376         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
377         if (flushedAmount > 0) {
378             in.progress(flushedAmount);
379             if (region.transferred() >= regionCount) {
380                 in.remove();
381             }
382             return 1;
383         } else if (flushedAmount == 0) {
384             validateFileRegion(region, offset);
385         }
386         return WRITE_STATUS_SNDBUF_FULL;
387     }
388 
389     /**
390      * Write a {@link FileRegion}
391      * @param in the collection which contains objects to write.
392      * @param region the {@link FileRegion} from which the bytes should be written
393      * @return The value that should be decremented from the write quantum which starts at
394      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
395      * <ul>
396      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
397      *     is encountered</li>
398      *     <li>1 - if a single call to write data was made to the OS</li>
399      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
400      *     no data was accepted</li>
401      * </ul>
402      */
403     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
404         if (region.transferred() >= region.count()) {
405             in.remove();
406             return 0;
407         }
408 
409         if (byteChannel == null) {
410             byteChannel = new EpollSocketWritableByteChannel();
411         }
412         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
413         if (flushedAmount > 0) {
414             in.progress(flushedAmount);
415             if (region.transferred() >= region.count()) {
416                 in.remove();
417             }
418             return 1;
419         }
420         return WRITE_STATUS_SNDBUF_FULL;
421     }
422 
423     @Override
424     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
425         int writeSpinCount = config().getWriteSpinCount();
426         do {
427             final int msgCount = in.size();
428             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
429             if (msgCount > 1 && in.current() instanceof ByteBuf) {
430                 writeSpinCount -= doWriteMultiple(in);
431             } else if (msgCount == 0) {
432                 // Wrote all messages.
433                 clearFlag(Native.EPOLLOUT);
434                 // Return here so we not set the EPOLLOUT flag.
435                 return;
436             } else {  // msgCount == 1
437                 writeSpinCount -= doWriteSingle(in);
438             }
439 
440             // We do not break the loop here even if the outbound buffer was flushed completely,
441             // because a user might have triggered another write and flush when we notify his or her
442             // listeners.
443         } while (writeSpinCount > 0);
444 
445         if (writeSpinCount == 0) {
446             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
447             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
448             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
449             // and set the EPOLLOUT if necessary.
450             clearFlag(Native.EPOLLOUT);
451 
452             // We used our writeSpin quantum, and should try to write again later.
453             eventLoop().execute(flushTask);
454         } else {
455             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
456             // when it can accept more data.
457             setFlag(Native.EPOLLOUT);
458         }
459     }
460 
461     /**
462      * Attempt to write a single object.
463      * @param in the collection which contains objects to write.
464      * @return The value that should be decremented from the write quantum which starts at
465      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
466      * <ul>
467      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
468      *     is encountered</li>
469      *     <li>1 - if a single call to write data was made to the OS</li>
470      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
471      *     no data was accepted</li>
472      * </ul>
473      * @throws Exception If an I/O error occurs.
474      */
475     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
476         // The outbound buffer contains only one message or it contains a file region.
477         Object msg = in.current();
478         if (msg instanceof ByteBuf) {
479             return writeBytes(in, (ByteBuf) msg);
480         } else if (msg instanceof DefaultFileRegion) {
481             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
482         } else if (msg instanceof FileRegion) {
483             return writeFileRegion(in, (FileRegion) msg);
484         } else if (msg instanceof SpliceOutTask) {
485             if (!((SpliceOutTask) msg).spliceOut()) {
486                 return WRITE_STATUS_SNDBUF_FULL;
487             }
488             in.remove();
489             return 1;
490         } else {
491             // Should never reach here.
492             throw new Error();
493         }
494     }
495 
496     /**
497      * Attempt to write multiple {@link ByteBuf} objects.
498      * @param in the collection which contains objects to write.
499      * @return The value that should be decremented from the write quantum which starts at
500      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
501      * <ul>
502      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
503      *     is encountered</li>
504      *     <li>1 - if a single call to write data was made to the OS</li>
505      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
506      *     no data was accepted</li>
507      * </ul>
508      * @throws Exception If an I/O error occurs.
509      */
510     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
511         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
512         IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
513         array.maxBytes(maxBytesPerGatheringWrite);
514         in.forEachFlushedMessage(array);
515 
516         if (array.count() >= 1) {
517             // TODO: Handle the case where cnt == 1 specially.
518             return writeBytesMultiple(in, array);
519         }
520         // cnt == 0, which means the outbound buffer contained empty buffers only.
521         in.removeBytes(0);
522         return 0;
523     }
524 
525     @Override
526     protected Object filterOutboundMessage(Object msg) {
527         if (msg instanceof ByteBuf) {
528             ByteBuf buf = (ByteBuf) msg;
529             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
530         }
531 
532         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
533             return msg;
534         }
535 
536         throw new UnsupportedOperationException(
537                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
538     }
539 
540     @Override
541     protected final void doShutdownOutput() throws Exception {
542         socket.shutdown(false, true);
543     }
544 
545     private void shutdownInput0(final ChannelPromise promise) {
546         try {
547             socket.shutdown(true, false);
548             promise.setSuccess();
549         } catch (Throwable cause) {
550             promise.setFailure(cause);
551         }
552     }
553 
554     @Override
555     public boolean isOutputShutdown() {
556         return socket.isOutputShutdown();
557     }
558 
559     @Override
560     public boolean isInputShutdown() {
561         return socket.isInputShutdown();
562     }
563 
564     @Override
565     public boolean isShutdown() {
566         return socket.isShutdown();
567     }
568 
569     @Override
570     public ChannelFuture shutdownOutput() {
571         return shutdownOutput(newPromise());
572     }
573 
574     @Override
575     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
576         EventLoop loop = eventLoop();
577         if (loop.inEventLoop()) {
578             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
579         } else {
580             loop.execute(new Runnable() {
581                 @Override
582                 public void run() {
583                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
584                 }
585             });
586         }
587 
588         return promise;
589     }
590 
591     @Override
592     public ChannelFuture shutdownInput() {
593         return shutdownInput(newPromise());
594     }
595 
596     @Override
597     public ChannelFuture shutdownInput(final ChannelPromise promise) {
598         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
599         if (closeExecutor != null) {
600             closeExecutor.execute(new Runnable() {
601                 @Override
602                 public void run() {
603                     shutdownInput0(promise);
604                 }
605             });
606         } else {
607             EventLoop loop = eventLoop();
608             if (loop.inEventLoop()) {
609                 shutdownInput0(promise);
610             } else {
611                 loop.execute(new Runnable() {
612                     @Override
613                     public void run() {
614                         shutdownInput0(promise);
615                     }
616                 });
617             }
618         }
619         return promise;
620     }
621 
622     @Override
623     public ChannelFuture shutdown() {
624         return shutdown(newPromise());
625     }
626 
627     @Override
628     public ChannelFuture shutdown(final ChannelPromise promise) {
629         ChannelFuture shutdownOutputFuture = shutdownOutput();
630         if (shutdownOutputFuture.isDone()) {
631             shutdownOutputDone(shutdownOutputFuture, promise);
632         } else {
633             shutdownOutputFuture.addListener(new ChannelFutureListener() {
634                 @Override
635                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
636                     shutdownOutputDone(shutdownOutputFuture, promise);
637                 }
638             });
639         }
640         return promise;
641     }
642 
643     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
644         ChannelFuture shutdownInputFuture = shutdownInput();
645         if (shutdownInputFuture.isDone()) {
646             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
647         } else {
648             shutdownInputFuture.addListener(new ChannelFutureListener() {
649                 @Override
650                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
651                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
652                 }
653             });
654         }
655     }
656 
657     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
658                               ChannelFuture shutdownInputFuture,
659                               ChannelPromise promise) {
660         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
661         Throwable shutdownInputCause = shutdownInputFuture.cause();
662         if (shutdownOutputCause != null) {
663             if (shutdownInputCause != null) {
664                 logger.debug("Exception suppressed because a previous exception occurred.",
665                         shutdownInputCause);
666             }
667             promise.setFailure(shutdownOutputCause);
668         } else if (shutdownInputCause != null) {
669             promise.setFailure(shutdownInputCause);
670         } else {
671             promise.setSuccess();
672         }
673     }
674 
675     @Override
676     protected void doClose() throws Exception {
677         try {
678             // Calling super.doClose() first so spliceTo(...) will fail on next call.
679             super.doClose();
680         } finally {
681             safeClosePipe(pipeIn);
682             safeClosePipe(pipeOut);
683             clearSpliceQueue(null);
684         }
685     }
686 
687     private void clearSpliceQueue(ClosedChannelException exception) {
688         Queue<SpliceInTask> sQueue = spliceQueue;
689         if (sQueue == null) {
690             return;
691         }
692         for (;;) {
693             SpliceInTask task = sQueue.poll();
694             if (task == null) {
695                 break;
696             }
697             if (exception == null) {
698                 exception = new ClosedChannelException();
699             }
700             task.promise.tryFailure(exception);
701         }
702     }
703 
704     private static void safeClosePipe(FileDescriptor fd) {
705         if (fd != null) {
706             try {
707                 fd.close();
708             } catch (IOException e) {
709                 logger.warn("Error while closing a pipe", e);
710             }
711         }
712     }
713 
714     class EpollStreamUnsafe extends AbstractEpollUnsafe {
715         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
716         @Override
717         protected Executor prepareToClose() {
718             return super.prepareToClose();
719         }
720 
721         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
722                 EpollRecvByteAllocatorHandle allocHandle) {
723             if (byteBuf != null) {
724                 if (byteBuf.isReadable()) {
725                     readPending = false;
726                     pipeline.fireChannelRead(byteBuf);
727                 } else {
728                     byteBuf.release();
729                 }
730             }
731             allocHandle.readComplete();
732             pipeline.fireChannelReadComplete();
733             pipeline.fireExceptionCaught(cause);
734 
735             // If oom will close the read event, release connection.
736             // See https://github.com/netty/netty/issues/10434
737             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
738                 shutdownInput(false);
739             }
740         }
741 
742         @Override
743         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
744             return new EpollRecvByteAllocatorStreamingHandle(handle);
745         }
746 
747         @Override
748         void epollInReady() {
749             final ChannelConfig config = config();
750             if (shouldBreakEpollInReady(config)) {
751                 clearEpollIn0();
752                 return;
753             }
754             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
755             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
756 
757             final ChannelPipeline pipeline = pipeline();
758             final ByteBufAllocator allocator = config.getAllocator();
759             allocHandle.reset(config);
760             epollInBefore();
761 
762             ByteBuf byteBuf = null;
763             boolean close = false;
764             Queue<SpliceInTask> sQueue = null;
765             try {
766                 do {
767                     if (sQueue != null || (sQueue = spliceQueue) != null) {
768                         SpliceInTask spliceTask = sQueue.peek();
769                         if (spliceTask != null) {
770                             boolean spliceInResult = spliceTask.spliceIn(allocHandle);
771 
772                             if (allocHandle.isReceivedRdHup()) {
773                                 shutdownInput(true);
774                             }
775                             if (spliceInResult) {
776                                 // We need to check if it is still active as if not we removed all SpliceTasks in
777                                 // doClose(...)
778                                 if (isActive()) {
779                                     sQueue.remove();
780                                 }
781                                 continue;
782                             } else {
783                                 break;
784                             }
785                         }
786                     }
787 
788                     // we use a direct buffer here as the native implementations only be able
789                     // to handle direct buffers.
790                     byteBuf = allocHandle.allocate(allocator);
791                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
792                     if (allocHandle.lastBytesRead() <= 0) {
793                         // nothing was read, release the buffer.
794                         byteBuf.release();
795                         byteBuf = null;
796                         close = allocHandle.lastBytesRead() < 0;
797                         if (close) {
798                             // There is nothing left to read as we received an EOF.
799                             readPending = false;
800                         }
801                         break;
802                     }
803                     allocHandle.incMessagesRead(1);
804                     readPending = false;
805                     pipeline.fireChannelRead(byteBuf);
806                     byteBuf = null;
807 
808                     if (shouldBreakEpollInReady(config)) {
809                         // We need to do this for two reasons:
810                         //
811                         // - If the input was shutdown in between (which may be the case when the user did it in the
812                         //   fireChannelRead(...) method we should not try to read again to not produce any
813                         //   miss-leading exceptions.
814                         //
815                         // - If the user closes the channel we need to ensure we not try to read from it again as
816                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
817                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
818                         //   reading data from a filedescriptor that belongs to another socket then the socket that
819                         //   was "wrapped" by this Channel implementation.
820                         break;
821                     }
822                 } while (allocHandle.continueReading());
823 
824                 allocHandle.readComplete();
825                 pipeline.fireChannelReadComplete();
826 
827                 if (close) {
828                     shutdownInput(false);
829                 }
830             } catch (Throwable t) {
831                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
832             } finally {
833                 if (sQueue == null) {
834                     epollInFinally(config);
835                 } else {
836                     if (!config.isAutoRead()) {
837                         clearEpollIn();
838                     }
839                 }
840             }
841         }
842     }
843 
844     private void addToSpliceQueue(final SpliceInTask task) {
845         Queue<SpliceInTask> sQueue = spliceQueue;
846         if (sQueue == null) {
847             synchronized (this) {
848                 sQueue = spliceQueue;
849                 if (sQueue == null) {
850                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
851                 }
852             }
853         }
854         sQueue.add(task);
855     }
856 
857     protected abstract class SpliceInTask {
858         final ChannelPromise promise;
859         int len;
860 
861         protected SpliceInTask(int len, ChannelPromise promise) {
862             this.promise = promise;
863             this.len = len;
864         }
865 
866         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
867 
868         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
869             // calculate the maximum amount of data we are allowed to splice
870             int length = Math.min(handle.guess(), len);
871             int splicedIn = 0;
872             for (;;) {
873                 // Splicing until there is nothing left to splice.
874                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
875                 handle.lastBytesRead(localSplicedIn);
876                 if (localSplicedIn == 0) {
877                     break;
878                 }
879                 splicedIn += localSplicedIn;
880                 length -= localSplicedIn;
881             }
882 
883             return splicedIn;
884         }
885     }
886 
887     // Let it directly implement channelFutureListener as well to reduce object creation.
888     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
889         private final AbstractEpollStreamChannel ch;
890 
891         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
892             super(len, promise);
893             this.ch = ch;
894         }
895 
896         @Override
897         public void operationComplete(ChannelFuture future) throws Exception {
898             if (!future.isSuccess()) {
899                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
900                 promise.tryFailure(future.cause());
901             }
902         }
903 
904         @Override
905         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
906             assert ch.eventLoop().inEventLoop();
907             if (len == 0) {
908                 // Use trySuccess() as the promise might already be closed by spliceTo(...)
909                 promise.trySuccess();
910                 return true;
911             }
912             try {
913                 // We create the pipe on the target channel as this will allow us to just handle pending writes
914                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
915                 // on multiple Channels pointing to one target Channel.
916                 FileDescriptor pipeOut = ch.pipeOut;
917                 if (pipeOut == null) {
918                     // Create a new pipe as non was created before.
919                     FileDescriptor[] pipe = pipe();
920                     ch.pipeIn = pipe[0];
921                     pipeOut = ch.pipeOut = pipe[1];
922                 }
923 
924                 int splicedIn = spliceIn(pipeOut, handle);
925                 if (splicedIn > 0) {
926                     // Integer.MAX_VALUE is a special value which will result in splice forever.
927                     if (len != Integer.MAX_VALUE) {
928                         len -= splicedIn;
929                     }
930 
931                     // Depending on if we are done with splicing inbound data we set the right promise for the
932                     // outbound splicing.
933                     final ChannelPromise splicePromise;
934                     if (len == 0) {
935                         splicePromise = promise;
936                     } else {
937                         splicePromise = ch.newPromise().addListener(this);
938                     }
939 
940                     boolean autoRead = config().isAutoRead();
941 
942                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
943                     // case.
944                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
945                     ch.unsafe().flush();
946                     if (autoRead && !splicePromise.isDone()) {
947                         // Write was not done which means the target channel was not writable. In this case we need to
948                         // disable reading until we are done with splicing to the target channel because:
949                         //
950                         // - The user may want to trigger another splice operation once the splicing was complete.
951                         config().setAutoRead(false);
952                     }
953                 }
954 
955                 return len == 0;
956             } catch (Throwable cause) {
957                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
958                 promise.tryFailure(cause);
959                 return true;
960             }
961         }
962     }
963 
964     private final class SpliceOutTask {
965         private final AbstractEpollStreamChannel ch;
966         private final boolean autoRead;
967         private int len;
968 
969         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
970             this.ch = ch;
971             this.len = len;
972             this.autoRead = autoRead;
973         }
974 
975         public boolean spliceOut() throws Exception {
976             assert ch.eventLoop().inEventLoop();
977             try {
978                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
979                 len -= splicedOut;
980                 if (len == 0) {
981                     if (autoRead) {
982                         // AutoRead was used and we spliced everything so start reading again
983                         config().setAutoRead(true);
984                     }
985                     return true;
986                 }
987                 return false;
988             } catch (IOException e) {
989                 if (autoRead) {
990                     // AutoRead was used and we spliced everything so start reading again
991                     config().setAutoRead(true);
992                 }
993                 throw e;
994             }
995         }
996     }
997 
998     private final class SpliceFdTask extends SpliceInTask {
999         private final FileDescriptor fd;
1000         private final ChannelPromise promise;
1001         private int offset;
1002 
1003         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1004             super(len, promise);
1005             this.fd = fd;
1006             this.promise = promise;
1007             this.offset = offset;
1008         }
1009 
1010         @Override
1011         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1012             assert eventLoop().inEventLoop();
1013             if (len == 0) {
1014                 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1015                 promise.trySuccess();
1016                 return true;
1017             }
1018 
1019             try {
1020                 FileDescriptor[] pipe = pipe();
1021                 FileDescriptor pipeIn = pipe[0];
1022                 FileDescriptor pipeOut = pipe[1];
1023                 try {
1024                     int splicedIn = spliceIn(pipeOut, handle);
1025                     if (splicedIn > 0) {
1026                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1027                         if (len != Integer.MAX_VALUE) {
1028                             len -= splicedIn;
1029                         }
1030                         do {
1031                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1032                             offset += splicedOut;
1033                             splicedIn -= splicedOut;
1034                         } while (splicedIn > 0);
1035                         if (len == 0) {
1036                             // Use trySuccess() as the promise might already be failed by spliceTo(...)
1037                             promise.trySuccess();
1038                             return true;
1039                         }
1040                     }
1041                     return false;
1042                 } finally {
1043                     safeClosePipe(pipeIn);
1044                     safeClosePipe(pipeOut);
1045                 }
1046             } catch (Throwable cause) {
1047                 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1048                 promise.tryFailure(cause);
1049                 return true;
1050             }
1051         }
1052     }
1053 
1054     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1055         EpollSocketWritableByteChannel() {
1056             super(socket);
1057             assert fd == socket;
1058         }
1059 
1060         @Override
1061         protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1062             return socket.send(buf, pos, limit);
1063         }
1064 
1065         @Override
1066         protected ByteBufAllocator alloc() {
1067             return AbstractEpollStreamChannel.this.alloc();
1068         }
1069     }
1070 }