View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.LeakPresenceDetector;
39  import io.netty.util.internal.PlatformDependent;
40  import io.netty.util.internal.StringUtil;
41  import io.netty.util.internal.logging.InternalLogger;
42  import io.netty.util.internal.logging.InternalLoggerFactory;
43  
44  import java.io.IOException;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.ClosedChannelException;
48  import java.nio.channels.WritableByteChannel;
49  import java.util.Queue;
50  import java.util.concurrent.Executor;
51  
52  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54  import static io.netty.channel.unix.FileDescriptor.pipe;
55  import static io.netty.util.internal.ObjectUtil.checkNotNull;
56  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57  import static io.netty.util.internal.StringUtil.className;
58  
59  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
60      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
61      private static final String EXPECTED_TYPES =
62              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
63                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
64      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
65  
66      private final Runnable flushTask = new Runnable() {
67          @Override
68          public void run() {
69              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
70              // meantime.
71              ((AbstractEpollUnsafe) unsafe()).flush0();
72          }
73      };
74  
75      // Lazy init these if we need to splice(...)
76      private volatile Queue<SpliceInTask> spliceQueue;
77      private FileDescriptor pipeIn;
78      private FileDescriptor pipeOut;
79  
80      private WritableByteChannel byteChannel;
81  
82      protected AbstractEpollStreamChannel(Channel parent, int fd) {
83          this(parent, new LinuxSocket(fd));
84      }
85  
86      protected AbstractEpollStreamChannel(int fd) {
87          this(new LinuxSocket(fd));
88      }
89  
90      AbstractEpollStreamChannel(LinuxSocket fd) {
91          this(fd, isSoErrorZero(fd));
92      }
93  
94      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
95          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
96          super(parent, fd, true, EpollIoOps.EPOLLRDHUP);
97      }
98  
99      protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
101         super(parent, fd, remote, EpollIoOps.EPOLLRDHUP);
102     }
103 
104     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
105         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
106         super(null, fd, active, EpollIoOps.EPOLLRDHUP);
107     }
108 
109     @Override
110     protected AbstractEpollUnsafe newUnsafe() {
111         return new EpollStreamUnsafe();
112     }
113 
114     @Override
115     public ChannelMetadata metadata() {
116         return METADATA;
117     }
118 
119     /**
120      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
121      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
122      * splice until the {@link ChannelFuture} was canceled or it was failed.
123      *
124      * Please note:
125      * <ul>
126      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
127      *   {@link IllegalArgumentException} is thrown. </li>
128      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
129      *   target {@link AbstractEpollStreamChannel}</li>
130      * </ul>
131      * @deprecated Will be removed in the future.
132      */
133     @Deprecated
134     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
135         return spliceTo(ch, len, newPromise());
136     }
137 
138     /**
139      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
140      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
141      * splice until the {@link ChannelFuture} was canceled or it was failed.
142      *
143      * Please note:
144      * <ul>
145      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
146      *   {@link IllegalArgumentException} is thrown. </li>
147      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
148      *   target {@link AbstractEpollStreamChannel}</li>
149      * </ul>
150      * @deprecated will be removed in the future.
151      */
152     @Deprecated
153     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154                                         final ChannelPromise promise) {
155         if (ch.eventLoop() != eventLoop()) {
156             throw new IllegalArgumentException("EventLoops are not the same.");
157         }
158         checkPositiveOrZero(len, "len");
159         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162         }
163         checkNotNull(promise, "promise");
164         if (!isOpen()) {
165             promise.tryFailure(new ClosedChannelException());
166         } else {
167             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168             failSpliceIfClosed(promise);
169         }
170         return promise;
171     }
172 
173     /**
174      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
175      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
176      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
177      * {@link ChannelFuture} was canceled or it was failed.
178      *
179      * Please note:
180      * <ul>
181      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
182      *   {@link AbstractEpollStreamChannel}</li>
183      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
184      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
185      * </ul>
186      * @deprecated Will be removed in the future.
187      */
188     @Deprecated
189     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
190         return spliceTo(ch, offset, len, newPromise());
191     }
192 
193     /**
194      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
195      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
196      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
197      * {@link ChannelFuture} was canceled or it was failed.
198      *
199      * Please note:
200      * <ul>
201      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
202      *   {@link AbstractEpollStreamChannel}</li>
203      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
204      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
205      * </ul>
206      * @deprecated Will be removed in the future.
207      */
208     @Deprecated
209     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
210                                         final ChannelPromise promise) {
211         checkPositiveOrZero(len, "len");
212         checkPositiveOrZero(offset, "offset");
213         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
214             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
215         }
216         checkNotNull(promise, "promise");
217         if (!isOpen()) {
218             promise.tryFailure(new ClosedChannelException());
219         } else {
220             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
221             failSpliceIfClosed(promise);
222         }
223         return promise;
224     }
225 
226     private void failSpliceIfClosed(ChannelPromise promise) {
227         if (!isOpen()) {
228             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
229             // cases where a future may not be notified otherwise.
230             if (!promise.isDone()) {
231                 final ClosedChannelException ex = new ClosedChannelException();
232                 if (promise.tryFailure(ex)) {
233                     eventLoop().execute(new Runnable() {
234                         @Override
235                         public void run() {
236                             // Call this via the EventLoop as it is a MPSC queue.
237                             clearSpliceQueue(ex);
238                         }
239                     });
240                 }
241             }
242         }
243     }
244 
245     /**
246      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
247      * @param in the collection which contains objects to write.
248      * @param buf the {@link ByteBuf} from which the bytes should be written
249      * @return The value that should be decremented from the write quantum which starts at
250      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
251      * <ul>
252      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
253      *     is encountered</li>
254      *     <li>1 - if a single call to write data was made to the OS</li>
255      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
256      *     no data was accepted</li>
257      * </ul>
258      */
259     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
260         int readableBytes = buf.readableBytes();
261         if (readableBytes == 0) {
262             in.remove();
263             return 0;
264         }
265 
266         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
267             return doWriteBytes(in, buf);
268         } else {
269             ByteBuffer[] nioBuffers = buf.nioBuffers();
270             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
271                     config().getMaxBytesPerGatheringWrite());
272         }
273     }
274 
275     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
276         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
277         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
278         // make a best effort to adjust as OS behavior changes.
279         if (attempted == written) {
280             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
281                 config().setMaxBytesPerGatheringWrite(attempted << 1);
282             }
283         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
284             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
285         }
286     }
287 
288     /**
289      * Write multiple bytes via {@link IovArray}.
290      * @param in the collection which contains objects to write.
291      * @param array The array which contains the content to write.
292      * @return The value that should be decremented from the write quantum which starts at
293      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
294      * <ul>
295      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
296      *     is encountered</li>
297      *     <li>1 - if a single call to write data was made to the OS</li>
298      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
299      *     no data was accepted</li>
300      * </ul>
301      * @throws IOException If an I/O exception occurs during write.
302      */
303     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
304         final long expectedWrittenBytes = array.size();
305         assert expectedWrittenBytes != 0;
306         final int cnt = array.count();
307         assert cnt != 0;
308 
309         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
310         if (localWrittenBytes > 0) {
311             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
312             in.removeBytes(localWrittenBytes);
313             return 1;
314         }
315         return WRITE_STATUS_SNDBUF_FULL;
316     }
317 
318     /**
319      * Write multiple bytes via {@link ByteBuffer} array.
320      * @param in the collection which contains objects to write.
321      * @param nioBuffers The buffers to write.
322      * @param nioBufferCnt The number of buffers to write.
323      * @param expectedWrittenBytes The number of bytes we expect to write.
324      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
325      * @return The value that should be decremented from the write quantum which starts at
326      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
327      * <ul>
328      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
329      *     is encountered</li>
330      *     <li>1 - if a single call to write data was made to the OS</li>
331      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
332      *     no data was accepted</li>
333      * </ul>
334      * @throws IOException If an I/O exception occurs during write.
335      */
336     private int writeBytesMultiple(
337             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
338             long maxBytesPerGatheringWrite) throws IOException {
339         assert expectedWrittenBytes != 0;
340         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
341             expectedWrittenBytes = maxBytesPerGatheringWrite;
342         }
343 
344         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
345         if (localWrittenBytes > 0) {
346             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
347             in.removeBytes(localWrittenBytes);
348             return 1;
349         }
350         return WRITE_STATUS_SNDBUF_FULL;
351     }
352 
353     /**
354      * Write a {@link DefaultFileRegion}
355      * @param in the collection which contains objects to write.
356      * @param region the {@link DefaultFileRegion} from which the bytes should be written
357      * @return The value that should be decremented from the write quantum which starts at
358      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
359      * <ul>
360      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
361      *     is encountered</li>
362      *     <li>1 - if a single call to write data was made to the OS</li>
363      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
364      *     no data was accepted</li>
365      * </ul>
366      */
367     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
368         final long offset = region.transferred();
369         final long regionCount = region.count();
370         if (offset >= regionCount) {
371             in.remove();
372             return 0;
373         }
374 
375         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
376         if (flushedAmount > 0) {
377             in.progress(flushedAmount);
378             if (region.transferred() >= regionCount) {
379                 in.remove();
380             }
381             return 1;
382         } else if (flushedAmount == 0) {
383             validateFileRegion(region, offset);
384         }
385         return WRITE_STATUS_SNDBUF_FULL;
386     }
387 
388     /**
389      * Write a {@link FileRegion}
390      * @param in the collection which contains objects to write.
391      * @param region the {@link FileRegion} from which the bytes should be written
392      * @return The value that should be decremented from the write quantum which starts at
393      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
394      * <ul>
395      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
396      *     is encountered</li>
397      *     <li>1 - if a single call to write data was made to the OS</li>
398      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
399      *     no data was accepted</li>
400      * </ul>
401      */
402     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
403         if (region.transferred() >= region.count()) {
404             in.remove();
405             return 0;
406         }
407 
408         if (byteChannel == null) {
409             byteChannel = new EpollSocketWritableByteChannel();
410         }
411         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
412         if (flushedAmount > 0) {
413             in.progress(flushedAmount);
414             if (region.transferred() >= region.count()) {
415                 in.remove();
416             }
417             return 1;
418         }
419         return WRITE_STATUS_SNDBUF_FULL;
420     }
421 
422     @Override
423     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
424         int writeSpinCount = config().getWriteSpinCount();
425         do {
426             final int msgCount = in.size();
427             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
428             if (msgCount > 1 && in.current() instanceof ByteBuf) {
429                 writeSpinCount -= doWriteMultiple(in);
430             } else if (msgCount == 0) {
431                 // Wrote all messages.
432                 clearFlag(Native.EPOLLOUT);
433                 // Return here so we not set the EPOLLOUT flag.
434                 return;
435             } else {  // msgCount == 1
436                 writeSpinCount -= doWriteSingle(in);
437             }
438 
439             // We do not break the loop here even if the outbound buffer was flushed completely,
440             // because a user might have triggered another write and flush when we notify his or her
441             // listeners.
442         } while (writeSpinCount > 0);
443 
444         if (writeSpinCount == 0) {
445             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
446             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
447             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
448             // and set the EPOLLOUT if necessary.
449             clearFlag(Native.EPOLLOUT);
450 
451             // We used our writeSpin quantum, and should try to write again later.
452             eventLoop().execute(flushTask);
453         } else {
454             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
455             // when it can accept more data.
456             setFlag(Native.EPOLLOUT);
457         }
458     }
459 
460     /**
461      * Attempt to write a single object.
462      * @param in the collection which contains objects to write.
463      * @return The value that should be decremented from the write quantum which starts at
464      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
465      * <ul>
466      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
467      *     is encountered</li>
468      *     <li>1 - if a single call to write data was made to the OS</li>
469      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
470      *     no data was accepted</li>
471      * </ul>
472      * @throws Exception If an I/O error occurs.
473      */
474     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
475         // The outbound buffer contains only one message or it contains a file region.
476         Object msg = in.current();
477         if (msg instanceof ByteBuf) {
478             return writeBytes(in, (ByteBuf) msg);
479         } else if (msg instanceof DefaultFileRegion) {
480             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
481         } else if (msg instanceof FileRegion) {
482             return writeFileRegion(in, (FileRegion) msg);
483         } else if (msg instanceof SpliceOutTask) {
484             if (!((SpliceOutTask) msg).spliceOut()) {
485                 return WRITE_STATUS_SNDBUF_FULL;
486             }
487             in.remove();
488             return 1;
489         } else {
490             // Should never reach here.
491             throw new Error("Unexpected message type: " + className(msg));
492         }
493     }
494 
495     /**
496      * Attempt to write multiple {@link ByteBuf} objects.
497      * @param in the collection which contains objects to write.
498      * @return The value that should be decremented from the write quantum which starts at
499      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
500      * <ul>
501      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
502      *     is encountered</li>
503      *     <li>1 - if a single call to write data was made to the OS</li>
504      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
505      *     no data was accepted</li>
506      * </ul>
507      * @throws Exception If an I/O error occurs.
508      */
509     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
510         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
511         IovArray array =  ((NativeArrays) registration().attachment()).cleanIovArray();
512         array.maxBytes(maxBytesPerGatheringWrite);
513         in.forEachFlushedMessage(array);
514 
515         if (array.count() >= 1) {
516             // TODO: Handle the case where cnt == 1 specially.
517             return writeBytesMultiple(in, array);
518         }
519         // cnt == 0, which means the outbound buffer contained empty buffers only.
520         in.removeBytes(0);
521         return 0;
522     }
523 
524     @Override
525     protected Object filterOutboundMessage(Object msg) {
526         if (msg instanceof ByteBuf) {
527             ByteBuf buf = (ByteBuf) msg;
528             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
529         }
530 
531         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
532             return msg;
533         }
534 
535         throw new UnsupportedOperationException(
536                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
537     }
538 
539     @Override
540     protected final void doShutdownOutput() throws Exception {
541         socket.shutdown(false, true);
542     }
543 
544     private void shutdownInput0(final ChannelPromise promise) {
545         try {
546             socket.shutdown(true, false);
547             promise.setSuccess();
548         } catch (Throwable cause) {
549             promise.setFailure(cause);
550         }
551     }
552 
553     @Override
554     public boolean isOutputShutdown() {
555         return socket.isOutputShutdown();
556     }
557 
558     @Override
559     public boolean isInputShutdown() {
560         return socket.isInputShutdown();
561     }
562 
563     @Override
564     public boolean isShutdown() {
565         return socket.isShutdown();
566     }
567 
568     @Override
569     public ChannelFuture shutdownOutput() {
570         return shutdownOutput(newPromise());
571     }
572 
573     @Override
574     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
575         EventLoop loop = eventLoop();
576         if (loop.inEventLoop()) {
577             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
578         } else {
579             loop.execute(new Runnable() {
580                 @Override
581                 public void run() {
582                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
583                 }
584             });
585         }
586 
587         return promise;
588     }
589 
590     @Override
591     public ChannelFuture shutdownInput() {
592         return shutdownInput(newPromise());
593     }
594 
595     @Override
596     public ChannelFuture shutdownInput(final ChannelPromise promise) {
597         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
598         if (closeExecutor != null) {
599             closeExecutor.execute(new Runnable() {
600                 @Override
601                 public void run() {
602                     shutdownInput0(promise);
603                 }
604             });
605         } else {
606             EventLoop loop = eventLoop();
607             if (loop.inEventLoop()) {
608                 shutdownInput0(promise);
609             } else {
610                 loop.execute(new Runnable() {
611                     @Override
612                     public void run() {
613                         shutdownInput0(promise);
614                     }
615                 });
616             }
617         }
618         return promise;
619     }
620 
621     @Override
622     public ChannelFuture shutdown() {
623         return shutdown(newPromise());
624     }
625 
626     @Override
627     public ChannelFuture shutdown(final ChannelPromise promise) {
628         ChannelFuture shutdownOutputFuture = shutdownOutput();
629         if (shutdownOutputFuture.isDone()) {
630             shutdownOutputDone(shutdownOutputFuture, promise);
631         } else {
632             shutdownOutputFuture.addListener(new ChannelFutureListener() {
633                 @Override
634                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
635                     shutdownOutputDone(shutdownOutputFuture, promise);
636                 }
637             });
638         }
639         return promise;
640     }
641 
642     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
643         ChannelFuture shutdownInputFuture = shutdownInput();
644         if (shutdownInputFuture.isDone()) {
645             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
646         } else {
647             shutdownInputFuture.addListener(new ChannelFutureListener() {
648                 @Override
649                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
650                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
651                 }
652             });
653         }
654     }
655 
656     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
657                               ChannelFuture shutdownInputFuture,
658                               ChannelPromise promise) {
659         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
660         Throwable shutdownInputCause = shutdownInputFuture.cause();
661         if (shutdownOutputCause != null) {
662             if (shutdownInputCause != null) {
663                 logger.debug("Exception suppressed because a previous exception occurred.",
664                         shutdownInputCause);
665             }
666             promise.setFailure(shutdownOutputCause);
667         } else if (shutdownInputCause != null) {
668             promise.setFailure(shutdownInputCause);
669         } else {
670             promise.setSuccess();
671         }
672     }
673 
674     @Override
675     protected void doClose() throws Exception {
676         try {
677             // Calling super.doClose() first so spliceTo(...) will fail on next call.
678             super.doClose();
679         } finally {
680             safeClosePipe(pipeIn);
681             safeClosePipe(pipeOut);
682             clearSpliceQueue(null);
683         }
684     }
685 
686     private void clearSpliceQueue(ClosedChannelException exception) {
687         Queue<SpliceInTask> sQueue = spliceQueue;
688         if (sQueue == null) {
689             return;
690         }
691         for (;;) {
692             SpliceInTask task = sQueue.poll();
693             if (task == null) {
694                 break;
695             }
696             if (exception == null) {
697                 exception = new ClosedChannelException();
698             }
699             task.promise.tryFailure(exception);
700         }
701     }
702 
703     private static void safeClosePipe(FileDescriptor fd) {
704         if (fd != null) {
705             try {
706                 fd.close();
707             } catch (IOException e) {
708                 logger.warn("Error while closing a pipe", e);
709             }
710         }
711     }
712 
713     class EpollStreamUnsafe extends AbstractEpollUnsafe {
714         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
715         @Override
716         protected Executor prepareToClose() {
717             return super.prepareToClose();
718         }
719 
720         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause,
721                                          boolean allDataRead, EpollRecvByteAllocatorHandle allocHandle) {
722             if (byteBuf != null) {
723                 if (byteBuf.isReadable()) {
724                     readPending = false;
725                     pipeline.fireChannelRead(byteBuf);
726                 } else {
727                     byteBuf.release();
728                 }
729             }
730             allocHandle.readComplete();
731             pipeline.fireChannelReadComplete();
732             pipeline.fireExceptionCaught(cause);
733 
734             // If oom will close the read event, release connection.
735             // See https://github.com/netty/netty/issues/10434
736             if (allDataRead ||
737                     cause instanceof OutOfMemoryError ||
738                     cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
739                     cause instanceof IOException) {
740                 shutdownInput(true);
741             }
742         }
743 
744         @Override
745         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
746             return new EpollRecvByteAllocatorStreamingHandle(handle);
747         }
748 
749         @Override
750         void epollInReady() {
751             final ChannelConfig config = config();
752             if (shouldBreakEpollInReady(config)) {
753                 clearEpollIn0();
754                 return;
755             }
756             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
757             final ChannelPipeline pipeline = pipeline();
758             final ByteBufAllocator allocator = config.getAllocator();
759             allocHandle.reset(config);
760 
761             ByteBuf byteBuf = null;
762             boolean allDataRead = false;
763             Queue<SpliceInTask> sQueue = null;
764             try {
765                 do {
766                     if (sQueue != null || (sQueue = spliceQueue) != null) {
767                         SpliceInTask spliceTask = sQueue.peek();
768                         if (spliceTask != null) {
769                             boolean spliceInResult = spliceTask.spliceIn(allocHandle);
770 
771                             if (allocHandle.isReceivedRdHup()) {
772                                 shutdownInput(false);
773                             }
774                             if (spliceInResult) {
775                                 // We need to check if it is still active as if not we removed all SpliceTasks in
776                                 // doClose(...)
777                                 if (isActive()) {
778                                     sQueue.remove();
779                                 }
780                                 continue;
781                             } else {
782                                 break;
783                             }
784                         }
785                     }
786 
787                     // we use a direct buffer here as the native implementations only be able
788                     // to handle direct buffers.
789                     byteBuf = allocHandle.allocate(allocator);
790                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
791                     if (allocHandle.lastBytesRead() <= 0) {
792                         // nothing was read, release the buffer.
793                         byteBuf.release();
794                         byteBuf = null;
795                         allDataRead = allocHandle.lastBytesRead() < 0;
796                         if (allDataRead) {
797                             // There is nothing left to read as we received an EOF.
798                             readPending = false;
799                         }
800                         break;
801                     }
802                     allocHandle.incMessagesRead(1);
803                     readPending = false;
804                     pipeline.fireChannelRead(byteBuf);
805                     byteBuf = null;
806 
807                     if (shouldBreakEpollInReady(config)) {
808                         // We need to do this for two reasons:
809                         //
810                         // - If the input was shutdown in between (which may be the case when the user did it in the
811                         //   fireChannelRead(...) method we should not try to read again to not produce any
812                         //   miss-leading exceptions.
813                         //
814                         // - If the user closes the channel we need to ensure we not try to read from it again as
815                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
816                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
817                         //   reading data from a filedescriptor that belongs to another socket then the socket that
818                         //   was "wrapped" by this Channel implementation.
819                         break;
820                     }
821                 } while (allocHandle.continueReading());
822 
823                 allocHandle.readComplete();
824                 pipeline.fireChannelReadComplete();
825 
826                 if (allDataRead) {
827                     shutdownInput(true);
828                 }
829             } catch (Throwable t) {
830                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
831             } finally {
832                 if (sQueue == null) {
833                     if (shouldStopReading(config)) {
834                         clearEpollIn();
835                     }
836                 } else {
837                     if (!config.isAutoRead()) {
838                         clearEpollIn();
839                     }
840                 }
841             }
842         }
843     }
844 
845     private void addToSpliceQueue(final SpliceInTask task) {
846         Queue<SpliceInTask> sQueue = spliceQueue;
847         if (sQueue == null) {
848             synchronized (this) {
849                 sQueue = spliceQueue;
850                 if (sQueue == null) {
851                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
852                 }
853             }
854         }
855         sQueue.add(task);
856     }
857 
858     protected abstract class SpliceInTask {
859         final ChannelPromise promise;
860         int len;
861 
862         protected SpliceInTask(int len, ChannelPromise promise) {
863             this.promise = promise;
864             this.len = len;
865         }
866 
867         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
868 
869         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
870             // calculate the maximum amount of data we are allowed to splice
871             int length = Math.min(handle.guess(), len);
872             int splicedIn = 0;
873             for (;;) {
874                 // Splicing until there is nothing left to splice.
875                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
876                 handle.lastBytesRead(localSplicedIn);
877                 if (localSplicedIn == 0) {
878                     break;
879                 }
880                 splicedIn += localSplicedIn;
881                 length -= localSplicedIn;
882             }
883 
884             return splicedIn;
885         }
886     }
887 
888     // Let it directly implement channelFutureListener as well to reduce object creation.
889     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
890         private final AbstractEpollStreamChannel ch;
891 
892         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
893             super(len, promise);
894             this.ch = ch;
895         }
896 
897         @Override
898         public void operationComplete(ChannelFuture future) throws Exception {
899             if (!future.isSuccess()) {
900                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
901                 promise.tryFailure(future.cause());
902             }
903         }
904 
905         @Override
906         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
907             assert ch.eventLoop().inEventLoop();
908             if (len == 0) {
909                 // Use trySuccess() as the promise might already be closed by spliceTo(...)
910                 promise.trySuccess();
911                 return true;
912             }
913             try {
914                 // We create the pipe on the target channel as this will allow us to just handle pending writes
915                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
916                 // on multiple Channels pointing to one target Channel.
917                 FileDescriptor pipeOut = ch.pipeOut;
918                 if (pipeOut == null) {
919                     // Create a new pipe as non was created before.
920                     FileDescriptor[] pipe = pipe();
921                     ch.pipeIn = pipe[0];
922                     pipeOut = ch.pipeOut = pipe[1];
923                 }
924 
925                 int splicedIn = spliceIn(pipeOut, handle);
926                 if (splicedIn > 0) {
927                     // Integer.MAX_VALUE is a special value which will result in splice forever.
928                     if (len != Integer.MAX_VALUE) {
929                         len -= splicedIn;
930                     }
931 
932                     // Depending on if we are done with splicing inbound data we set the right promise for the
933                     // outbound splicing.
934                     final ChannelPromise splicePromise;
935                     if (len == 0) {
936                         splicePromise = promise;
937                     } else {
938                         splicePromise = ch.newPromise().addListener(this);
939                     }
940 
941                     boolean autoRead = config().isAutoRead();
942 
943                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
944                     // case.
945                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
946                     ch.unsafe().flush();
947                     if (autoRead && !splicePromise.isDone()) {
948                         // Write was not done which means the target channel was not writable. In this case we need to
949                         // disable reading until we are done with splicing to the target channel because:
950                         //
951                         // - The user may want to trigger another splice operation once the splicing was complete.
952                         config().setAutoRead(false);
953                     }
954                 }
955 
956                 return len == 0;
957             } catch (Throwable cause) {
958                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
959                 promise.tryFailure(cause);
960                 return true;
961             }
962         }
963     }
964 
965     private final class SpliceOutTask {
966         private final AbstractEpollStreamChannel ch;
967         private final boolean autoRead;
968         private int len;
969 
970         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
971             this.ch = ch;
972             this.len = len;
973             this.autoRead = autoRead;
974         }
975 
976         public boolean spliceOut() throws Exception {
977             assert ch.eventLoop().inEventLoop();
978             try {
979                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
980                 len -= splicedOut;
981                 if (len == 0) {
982                     if (autoRead) {
983                         // AutoRead was used and we spliced everything so start reading again
984                         config().setAutoRead(true);
985                     }
986                     return true;
987                 }
988                 return false;
989             } catch (IOException e) {
990                 if (autoRead) {
991                     // AutoRead was used and we spliced everything so start reading again
992                     config().setAutoRead(true);
993                 }
994                 throw e;
995             }
996         }
997     }
998 
999     private final class SpliceFdTask extends SpliceInTask {
1000         private final FileDescriptor fd;
1001         private final ChannelPromise promise;
1002         private int offset;
1003 
1004         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1005             super(len, promise);
1006             this.fd = fd;
1007             this.promise = promise;
1008             this.offset = offset;
1009         }
1010 
1011         @Override
1012         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1013             assert eventLoop().inEventLoop();
1014             if (len == 0) {
1015                 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1016                 promise.trySuccess();
1017                 return true;
1018             }
1019 
1020             try {
1021                 FileDescriptor[] pipe = pipe();
1022                 FileDescriptor pipeIn = pipe[0];
1023                 FileDescriptor pipeOut = pipe[1];
1024                 try {
1025                     int splicedIn = spliceIn(pipeOut, handle);
1026                     if (splicedIn > 0) {
1027                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1028                         if (len != Integer.MAX_VALUE) {
1029                             len -= splicedIn;
1030                         }
1031                         do {
1032                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1033                             offset += splicedOut;
1034                             splicedIn -= splicedOut;
1035                         } while (splicedIn > 0);
1036                         if (len == 0) {
1037                             // Use trySuccess() as the promise might already be failed by spliceTo(...)
1038                             promise.trySuccess();
1039                             return true;
1040                         }
1041                     }
1042                     return false;
1043                 } finally {
1044                     safeClosePipe(pipeIn);
1045                     safeClosePipe(pipeOut);
1046                 }
1047             } catch (Throwable cause) {
1048                 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1049                 promise.tryFailure(cause);
1050                 return true;
1051             }
1052         }
1053     }
1054 
1055     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1056         EpollSocketWritableByteChannel() {
1057             super(socket);
1058             assert fd == socket;
1059         }
1060 
1061         @Override
1062         protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1063             return socket.send(buf, pos, limit);
1064         }
1065 
1066         @Override
1067         protected ByteBufAllocator alloc() {
1068             return AbstractEpollStreamChannel.this.alloc();
1069         }
1070     }
1071 }