View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.ThrowableUtil;
41  import io.netty.util.internal.UnstableApi;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.ClosedChannelException;
49  import java.nio.channels.WritableByteChannel;
50  import java.util.Queue;
51  import java.util.concurrent.Executor;
52  
53  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.FileDescriptor.pipe;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
58  
59  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
60      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
61      private static final String EXPECTED_TYPES =
62              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
63                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
64      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
65      private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
66              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
67                      AbstractEpollStreamChannel.class, "clearSpliceQueue()");
68      private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
69              new ClosedChannelException(),
70              AbstractEpollStreamChannel.class, "spliceTo(...)");
71      private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
72              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
73              AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
74      private final Runnable flushTask = new Runnable() {
75          @Override
76          public void run() {
77              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
78              // meantime.
79              ((AbstractEpollUnsafe) unsafe()).flush0();
80          }
81      };
82      private Queue<SpliceInTask> spliceQueue;
83  
84      // Lazy init these if we need to splice(...)
85      private FileDescriptor pipeIn;
86      private FileDescriptor pipeOut;
87  
88      private WritableByteChannel byteChannel;
89  
90      protected AbstractEpollStreamChannel(Channel parent, int fd) {
91          this(parent, new LinuxSocket(fd));
92      }
93  
94      protected AbstractEpollStreamChannel(int fd) {
95          this(new LinuxSocket(fd));
96      }
97  
98      AbstractEpollStreamChannel(LinuxSocket fd) {
99          this(fd, isSoErrorZero(fd));
100     }
101 
102     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
103         super(parent, fd, true);
104         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
105         flags |= Native.EPOLLRDHUP;
106     }
107 
108     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
109         super(parent, fd, remote);
110         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
111         flags |= Native.EPOLLRDHUP;
112     }
113 
114     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
115         super(null, fd, active);
116         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
117         flags |= Native.EPOLLRDHUP;
118     }
119 
120     @Override
121     protected AbstractEpollUnsafe newUnsafe() {
122         return new EpollStreamUnsafe();
123     }
124 
125     @Override
126     public ChannelMetadata metadata() {
127         return METADATA;
128     }
129 
130     /**
131      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
132      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
133      * splice until the {@link ChannelFuture} was canceled or it was failed.
134      *
135      * Please note:
136      * <ul>
137      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
138      *   {@link IllegalArgumentException} is thrown. </li>
139      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
140      *   target {@link AbstractEpollStreamChannel}</li>
141      * </ul>
142      *
143      */
144     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
145         return spliceTo(ch, len, newPromise());
146     }
147 
148     /**
149      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
150      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
151      * splice until the {@link ChannelFuture} was canceled or it was failed.
152      *
153      * Please note:
154      * <ul>
155      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
156      *   {@link IllegalArgumentException} is thrown. </li>
157      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
158      *   target {@link AbstractEpollStreamChannel}</li>
159      * </ul>
160      *
161      */
162     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
163                                         final ChannelPromise promise) {
164         if (ch.eventLoop() != eventLoop()) {
165             throw new IllegalArgumentException("EventLoops are not the same.");
166         }
167         checkPositiveOrZero(len, "len");
168         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
169                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
170             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
171         }
172         checkNotNull(promise, "promise");
173         if (!isOpen()) {
174             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
175         } else {
176             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
177             failSpliceIfClosed(promise);
178         }
179         return promise;
180     }
181 
182     /**
183      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
184      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
185      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
186      * {@link ChannelFuture} was canceled or it was failed.
187      *
188      * Please note:
189      * <ul>
190      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
191      *   {@link AbstractEpollStreamChannel}</li>
192      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
193      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
194      * </ul>
195      */
196     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
197         return spliceTo(ch, offset, len, newPromise());
198     }
199 
200     /**
201      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
202      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
203      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
204      * {@link ChannelFuture} was canceled or it was failed.
205      *
206      * Please note:
207      * <ul>
208      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
209      *   {@link AbstractEpollStreamChannel}</li>
210      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
211      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
212      * </ul>
213      */
214     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
215                                         final ChannelPromise promise) {
216         checkPositiveOrZero(len, "len");
217         checkPositiveOrZero(offset, "offser");
218         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
219             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
220         }
221         checkNotNull(promise, "promise");
222         if (!isOpen()) {
223             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
224         } else {
225             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
226             failSpliceIfClosed(promise);
227         }
228         return promise;
229     }
230 
231     private void failSpliceIfClosed(ChannelPromise promise) {
232         if (!isOpen()) {
233             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
234             // cases where a future may not be notified otherwise.
235             if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
236                 eventLoop().execute(new Runnable() {
237                     @Override
238                     public void run() {
239                         // Call this via the EventLoop as it is a MPSC queue.
240                         clearSpliceQueue();
241                     }
242                 });
243             }
244         }
245     }
246 
247     /**
248      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
249      * @param in the collection which contains objects to write.
250      * @param buf the {@link ByteBuf} from which the bytes should be written
251      * @return The value that should be decremented from the write quantum which starts at
252      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
253      * <ul>
254      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
255      *     is encountered</li>
256      *     <li>1 - if a single call to write data was made to the OS</li>
257      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
258      *     no data was accepted</li>
259      * </ul>
260      */
261     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
262         int readableBytes = buf.readableBytes();
263         if (readableBytes == 0) {
264             in.remove();
265             return 0;
266         }
267 
268         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
269             return doWriteBytes(in, buf);
270         } else {
271             ByteBuffer[] nioBuffers = buf.nioBuffers();
272             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
273                     config().getMaxBytesPerGatheringWrite());
274         }
275     }
276 
277     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
278         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
279         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
280         // make a best effort to adjust as OS behavior changes.
281         if (attempted == written) {
282             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
283                 config().setMaxBytesPerGatheringWrite(attempted << 1);
284             }
285         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
286             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
287         }
288     }
289 
290     /**
291      * Write multiple bytes via {@link IovArray}.
292      * @param in the collection which contains objects to write.
293      * @param array The array which contains the content to write.
294      * @return The value that should be decremented from the write quantum which starts at
295      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
296      * <ul>
297      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
298      *     is encountered</li>
299      *     <li>1 - if a single call to write data was made to the OS</li>
300      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
301      *     no data was accepted</li>
302      * </ul>
303      * @throws IOException If an I/O exception occurs during write.
304      */
305     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
306         final long expectedWrittenBytes = array.size();
307         assert expectedWrittenBytes != 0;
308         final int cnt = array.count();
309         assert cnt != 0;
310 
311         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
312         if (localWrittenBytes > 0) {
313             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
314             in.removeBytes(localWrittenBytes);
315             return 1;
316         }
317         return WRITE_STATUS_SNDBUF_FULL;
318     }
319 
320     /**
321      * Write multiple bytes via {@link ByteBuffer} array.
322      * @param in the collection which contains objects to write.
323      * @param nioBuffers The buffers to write.
324      * @param nioBufferCnt The number of buffers to write.
325      * @param expectedWrittenBytes The number of bytes we expect to write.
326      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
327      * @return The value that should be decremented from the write quantum which starts at
328      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
329      * <ul>
330      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
331      *     is encountered</li>
332      *     <li>1 - if a single call to write data was made to the OS</li>
333      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
334      *     no data was accepted</li>
335      * </ul>
336      * @throws IOException If an I/O exception occurs during write.
337      */
338     private int writeBytesMultiple(
339             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
340             long maxBytesPerGatheringWrite) throws IOException {
341         assert expectedWrittenBytes != 0;
342         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
343             expectedWrittenBytes = maxBytesPerGatheringWrite;
344         }
345 
346         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
347         if (localWrittenBytes > 0) {
348             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
349             in.removeBytes(localWrittenBytes);
350             return 1;
351         }
352         return WRITE_STATUS_SNDBUF_FULL;
353     }
354 
355     /**
356      * Write a {@link DefaultFileRegion}
357      * @param in the collection which contains objects to write.
358      * @param region the {@link DefaultFileRegion} from which the bytes should be written
359      * @return The value that should be decremented from the write quantum which starts at
360      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
361      * <ul>
362      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
363      *     is encountered</li>
364      *     <li>1 - if a single call to write data was made to the OS</li>
365      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
366      *     no data was accepted</li>
367      * </ul>
368      */
369     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
370         final long offset = region.transferred();
371         final long regionCount = region.count();
372         if (offset >= regionCount) {
373             in.remove();
374             return 0;
375         }
376 
377         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
378         if (flushedAmount > 0) {
379             in.progress(flushedAmount);
380             if (region.transferred() >= regionCount) {
381                 in.remove();
382             }
383             return 1;
384         } else if (flushedAmount == 0) {
385             validateFileRegion(region, offset);
386         }
387         return WRITE_STATUS_SNDBUF_FULL;
388     }
389 
390     /**
391      * Write a {@link FileRegion}
392      * @param in the collection which contains objects to write.
393      * @param region the {@link FileRegion} from which the bytes should be written
394      * @return The value that should be decremented from the write quantum which starts at
395      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
396      * <ul>
397      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
398      *     is encountered</li>
399      *     <li>1 - if a single call to write data was made to the OS</li>
400      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
401      *     no data was accepted</li>
402      * </ul>
403      */
404     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
405         if (region.transferred() >= region.count()) {
406             in.remove();
407             return 0;
408         }
409 
410         if (byteChannel == null) {
411             byteChannel = new EpollSocketWritableByteChannel();
412         }
413         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
414         if (flushedAmount > 0) {
415             in.progress(flushedAmount);
416             if (region.transferred() >= region.count()) {
417                 in.remove();
418             }
419             return 1;
420         }
421         return WRITE_STATUS_SNDBUF_FULL;
422     }
423 
424     @Override
425     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
426         int writeSpinCount = config().getWriteSpinCount();
427         do {
428             final int msgCount = in.size();
429             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
430             if (msgCount > 1 && in.current() instanceof ByteBuf) {
431                 writeSpinCount -= doWriteMultiple(in);
432             } else if (msgCount == 0) {
433                 // Wrote all messages.
434                 clearFlag(Native.EPOLLOUT);
435                 // Return here so we not set the EPOLLOUT flag.
436                 return;
437             } else {  // msgCount == 1
438                 writeSpinCount -= doWriteSingle(in);
439             }
440 
441             // We do not break the loop here even if the outbound buffer was flushed completely,
442             // because a user might have triggered another write and flush when we notify his or her
443             // listeners.
444         } while (writeSpinCount > 0);
445 
446         if (writeSpinCount == 0) {
447             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
448             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
449             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
450             // and set the EPOLLOUT if necessary.
451             clearFlag(Native.EPOLLOUT);
452 
453             // We used our writeSpin quantum, and should try to write again later.
454             eventLoop().execute(flushTask);
455         } else {
456             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
457             // when it can accept more data.
458             setFlag(Native.EPOLLOUT);
459         }
460     }
461 
462     /**
463      * Attempt to write a single object.
464      * @param in the collection which contains objects to write.
465      * @return The value that should be decremented from the write quantum which starts at
466      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
467      * <ul>
468      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
469      *     is encountered</li>
470      *     <li>1 - if a single call to write data was made to the OS</li>
471      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
472      *     no data was accepted</li>
473      * </ul>
474      * @throws Exception If an I/O error occurs.
475      */
476     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
477         // The outbound buffer contains only one message or it contains a file region.
478         Object msg = in.current();
479         if (msg instanceof ByteBuf) {
480             return writeBytes(in, (ByteBuf) msg);
481         } else if (msg instanceof DefaultFileRegion) {
482             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
483         } else if (msg instanceof FileRegion) {
484             return writeFileRegion(in, (FileRegion) msg);
485         } else if (msg instanceof SpliceOutTask) {
486             if (!((SpliceOutTask) msg).spliceOut()) {
487                 return WRITE_STATUS_SNDBUF_FULL;
488             }
489             in.remove();
490             return 1;
491         } else {
492             // Should never reach here.
493             throw new Error();
494         }
495     }
496 
497     /**
498      * Attempt to write multiple {@link ByteBuf} objects.
499      * @param in the collection which contains objects to write.
500      * @return The value that should be decremented from the write quantum which starts at
501      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
502      * <ul>
503      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
504      *     is encountered</li>
505      *     <li>1 - if a single call to write data was made to the OS</li>
506      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
507      *     no data was accepted</li>
508      * </ul>
509      * @throws Exception If an I/O error occurs.
510      */
511     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
512         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
513         IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
514         array.maxBytes(maxBytesPerGatheringWrite);
515         in.forEachFlushedMessage(array);
516 
517         if (array.count() >= 1) {
518             // TODO: Handle the case where cnt == 1 specially.
519             return writeBytesMultiple(in, array);
520         }
521         // cnt == 0, which means the outbound buffer contained empty buffers only.
522         in.removeBytes(0);
523         return 0;
524     }
525 
526     @Override
527     protected Object filterOutboundMessage(Object msg) {
528         if (msg instanceof ByteBuf) {
529             ByteBuf buf = (ByteBuf) msg;
530             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
531         }
532 
533         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
534             return msg;
535         }
536 
537         throw new UnsupportedOperationException(
538                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
539     }
540 
541     @UnstableApi
542     @Override
543     protected final void doShutdownOutput() throws Exception {
544         socket.shutdown(false, true);
545     }
546 
547     private void shutdownInput0(final ChannelPromise promise) {
548         try {
549             socket.shutdown(true, false);
550             promise.setSuccess();
551         } catch (Throwable cause) {
552             promise.setFailure(cause);
553         }
554     }
555 
556     @Override
557     public boolean isOutputShutdown() {
558         return socket.isOutputShutdown();
559     }
560 
561     @Override
562     public boolean isInputShutdown() {
563         return socket.isInputShutdown();
564     }
565 
566     @Override
567     public boolean isShutdown() {
568         return socket.isShutdown();
569     }
570 
571     @Override
572     public ChannelFuture shutdownOutput() {
573         return shutdownOutput(newPromise());
574     }
575 
576     @Override
577     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
578         EventLoop loop = eventLoop();
579         if (loop.inEventLoop()) {
580             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
581         } else {
582             loop.execute(new Runnable() {
583                 @Override
584                 public void run() {
585                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
586                 }
587             });
588         }
589 
590         return promise;
591     }
592 
593     @Override
594     public ChannelFuture shutdownInput() {
595         return shutdownInput(newPromise());
596     }
597 
598     @Override
599     public ChannelFuture shutdownInput(final ChannelPromise promise) {
600         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
601         if (closeExecutor != null) {
602             closeExecutor.execute(new Runnable() {
603                 @Override
604                 public void run() {
605                     shutdownInput0(promise);
606                 }
607             });
608         } else {
609             EventLoop loop = eventLoop();
610             if (loop.inEventLoop()) {
611                 shutdownInput0(promise);
612             } else {
613                 loop.execute(new Runnable() {
614                     @Override
615                     public void run() {
616                         shutdownInput0(promise);
617                     }
618                 });
619             }
620         }
621         return promise;
622     }
623 
624     @Override
625     public ChannelFuture shutdown() {
626         return shutdown(newPromise());
627     }
628 
629     @Override
630     public ChannelFuture shutdown(final ChannelPromise promise) {
631         ChannelFuture shutdownOutputFuture = shutdownOutput();
632         if (shutdownOutputFuture.isDone()) {
633             shutdownOutputDone(shutdownOutputFuture, promise);
634         } else {
635             shutdownOutputFuture.addListener(new ChannelFutureListener() {
636                 @Override
637                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
638                     shutdownOutputDone(shutdownOutputFuture, promise);
639                 }
640             });
641         }
642         return promise;
643     }
644 
645     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
646         ChannelFuture shutdownInputFuture = shutdownInput();
647         if (shutdownInputFuture.isDone()) {
648             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
649         } else {
650             shutdownInputFuture.addListener(new ChannelFutureListener() {
651                 @Override
652                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
653                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
654                 }
655             });
656         }
657     }
658 
659     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
660                               ChannelFuture shutdownInputFuture,
661                               ChannelPromise promise) {
662         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
663         Throwable shutdownInputCause = shutdownInputFuture.cause();
664         if (shutdownOutputCause != null) {
665             if (shutdownInputCause != null) {
666                 logger.debug("Exception suppressed because a previous exception occurred.",
667                         shutdownInputCause);
668             }
669             promise.setFailure(shutdownOutputCause);
670         } else if (shutdownInputCause != null) {
671             promise.setFailure(shutdownInputCause);
672         } else {
673             promise.setSuccess();
674         }
675     }
676 
677     @Override
678     protected void doClose() throws Exception {
679         try {
680             // Calling super.doClose() first so spliceTo(...) will fail on next call.
681             super.doClose();
682         } finally {
683             safeClosePipe(pipeIn);
684             safeClosePipe(pipeOut);
685             clearSpliceQueue();
686         }
687     }
688 
689     private void clearSpliceQueue() {
690         if (spliceQueue == null) {
691             return;
692         }
693         for (;;) {
694             SpliceInTask task = spliceQueue.poll();
695             if (task == null) {
696                 break;
697             }
698             task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
699         }
700     }
701 
702     private static void safeClosePipe(FileDescriptor fd) {
703         if (fd != null) {
704             try {
705                 fd.close();
706             } catch (IOException e) {
707                 if (logger.isWarnEnabled()) {
708                     logger.warn("Error while closing a pipe", e);
709                 }
710             }
711         }
712     }
713 
714     class EpollStreamUnsafe extends AbstractEpollUnsafe {
715         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
716         @Override
717         protected Executor prepareToClose() {
718             return super.prepareToClose();
719         }
720 
721         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
722                 EpollRecvByteAllocatorHandle allocHandle) {
723             if (byteBuf != null) {
724                 if (byteBuf.isReadable()) {
725                     readPending = false;
726                     pipeline.fireChannelRead(byteBuf);
727                 } else {
728                     byteBuf.release();
729                 }
730             }
731             allocHandle.readComplete();
732             pipeline.fireChannelReadComplete();
733             pipeline.fireExceptionCaught(cause);
734             if (close || cause instanceof IOException) {
735                 shutdownInput(false);
736             }
737         }
738 
739         @Override
740         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
741             return new EpollRecvByteAllocatorStreamingHandle(handle);
742         }
743 
744         @Override
745         void epollInReady() {
746             final ChannelConfig config = config();
747             if (shouldBreakEpollInReady(config)) {
748                 clearEpollIn0();
749                 return;
750             }
751             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
752             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
753 
754             final ChannelPipeline pipeline = pipeline();
755             final ByteBufAllocator allocator = config.getAllocator();
756             allocHandle.reset(config);
757             epollInBefore();
758 
759             ByteBuf byteBuf = null;
760             boolean close = false;
761             try {
762                 do {
763                     if (spliceQueue != null) {
764                         SpliceInTask spliceTask = spliceQueue.peek();
765                         if (spliceTask != null) {
766                             if (spliceTask.spliceIn(allocHandle)) {
767                                 // We need to check if it is still active as if not we removed all SpliceTasks in
768                                 // doClose(...)
769                                 if (isActive()) {
770                                     spliceQueue.remove();
771                                 }
772                                 continue;
773                             } else {
774                                 break;
775                             }
776                         }
777                     }
778 
779                     // we use a direct buffer here as the native implementations only be able
780                     // to handle direct buffers.
781                     byteBuf = allocHandle.allocate(allocator);
782                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
783                     if (allocHandle.lastBytesRead() <= 0) {
784                         // nothing was read, release the buffer.
785                         byteBuf.release();
786                         byteBuf = null;
787                         close = allocHandle.lastBytesRead() < 0;
788                         if (close) {
789                             // There is nothing left to read as we received an EOF.
790                             readPending = false;
791                         }
792                         break;
793                     }
794                     allocHandle.incMessagesRead(1);
795                     readPending = false;
796                     pipeline.fireChannelRead(byteBuf);
797                     byteBuf = null;
798 
799                     if (shouldBreakEpollInReady(config)) {
800                         // We need to do this for two reasons:
801                         //
802                         // - If the input was shutdown in between (which may be the case when the user did it in the
803                         //   fireChannelRead(...) method we should not try to read again to not produce any
804                         //   miss-leading exceptions.
805                         //
806                         // - If the user closes the channel we need to ensure we not try to read from it again as
807                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
808                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
809                         //   reading data from a filedescriptor that belongs to another socket then the socket that
810                         //   was "wrapped" by this Channel implementation.
811                         break;
812                     }
813                 } while (allocHandle.continueReading());
814 
815                 allocHandle.readComplete();
816                 pipeline.fireChannelReadComplete();
817 
818                 if (close) {
819                     shutdownInput(false);
820                 }
821             } catch (Throwable t) {
822                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
823             } finally {
824                 epollInFinally(config);
825             }
826         }
827     }
828 
829     private void addToSpliceQueue(final SpliceInTask task) {
830         EventLoop eventLoop = eventLoop();
831         if (eventLoop.inEventLoop()) {
832             addToSpliceQueue0(task);
833         } else {
834             eventLoop.execute(new Runnable() {
835                 @Override
836                 public void run() {
837                     addToSpliceQueue0(task);
838                 }
839             });
840         }
841     }
842 
843     private void addToSpliceQueue0(SpliceInTask task) {
844         if (spliceQueue == null) {
845             spliceQueue = PlatformDependent.newMpscQueue();
846         }
847         spliceQueue.add(task);
848     }
849 
850     protected abstract class SpliceInTask {
851         final ChannelPromise promise;
852         int len;
853 
854         protected SpliceInTask(int len, ChannelPromise promise) {
855             this.promise = promise;
856             this.len = len;
857         }
858 
859         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
860 
861         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
862             // calculate the maximum amount of data we are allowed to splice
863             int length = Math.min(handle.guess(), len);
864             int splicedIn = 0;
865             for (;;) {
866                 // Splicing until there is nothing left to splice.
867                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
868                 if (localSplicedIn == 0) {
869                     break;
870                 }
871                 splicedIn += localSplicedIn;
872                 length -= localSplicedIn;
873             }
874 
875             return splicedIn;
876         }
877     }
878 
879     // Let it directly implement channelFutureListener as well to reduce object creation.
880     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
881         private final AbstractEpollStreamChannel ch;
882 
883         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
884             super(len, promise);
885             this.ch = ch;
886         }
887 
888         @Override
889         public void operationComplete(ChannelFuture future) throws Exception {
890             if (!future.isSuccess()) {
891                 promise.setFailure(future.cause());
892             }
893         }
894 
895         @Override
896         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
897             assert ch.eventLoop().inEventLoop();
898             if (len == 0) {
899                 promise.setSuccess();
900                 return true;
901             }
902             try {
903                 // We create the pipe on the target channel as this will allow us to just handle pending writes
904                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
905                 // on multiple Channels pointing to one target Channel.
906                 FileDescriptor pipeOut = ch.pipeOut;
907                 if (pipeOut == null) {
908                     // Create a new pipe as non was created before.
909                     FileDescriptor[] pipe = pipe();
910                     ch.pipeIn = pipe[0];
911                     pipeOut = ch.pipeOut = pipe[1];
912                 }
913 
914                 int splicedIn = spliceIn(pipeOut, handle);
915                 if (splicedIn > 0) {
916                     // Integer.MAX_VALUE is a special value which will result in splice forever.
917                     if (len != Integer.MAX_VALUE) {
918                         len -= splicedIn;
919                     }
920 
921                     // Depending on if we are done with splicing inbound data we set the right promise for the
922                     // outbound splicing.
923                     final ChannelPromise splicePromise;
924                     if (len == 0) {
925                         splicePromise = promise;
926                     } else {
927                         splicePromise = ch.newPromise().addListener(this);
928                     }
929 
930                     boolean autoRead = config().isAutoRead();
931 
932                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
933                     // case.
934                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
935                     ch.unsafe().flush();
936                     if (autoRead && !splicePromise.isDone()) {
937                         // Write was not done which means the target channel was not writable. In this case we need to
938                         // disable reading until we are done with splicing to the target channel because:
939                         //
940                         // - The user may want to to trigger another splice operation once the splicing was complete.
941                         config().setAutoRead(false);
942                     }
943                 }
944 
945                 return len == 0;
946             } catch (Throwable cause) {
947                 promise.setFailure(cause);
948                 return true;
949             }
950         }
951     }
952 
953     private final class SpliceOutTask {
954         private final AbstractEpollStreamChannel ch;
955         private final boolean autoRead;
956         private int len;
957 
958         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
959             this.ch = ch;
960             this.len = len;
961             this.autoRead = autoRead;
962         }
963 
964         public boolean spliceOut() throws Exception {
965             assert ch.eventLoop().inEventLoop();
966             try {
967                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
968                 len -= splicedOut;
969                 if (len == 0) {
970                     if (autoRead) {
971                         // AutoRead was used and we spliced everything so start reading again
972                         config().setAutoRead(true);
973                     }
974                     return true;
975                 }
976                 return false;
977             } catch (IOException e) {
978                 if (autoRead) {
979                     // AutoRead was used and we spliced everything so start reading again
980                     config().setAutoRead(true);
981                 }
982                 throw e;
983             }
984         }
985     }
986 
987     private final class SpliceFdTask extends SpliceInTask {
988         private final FileDescriptor fd;
989         private final ChannelPromise promise;
990         private final int offset;
991 
992         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
993             super(len, promise);
994             this.fd = fd;
995             this.promise = promise;
996             this.offset = offset;
997         }
998 
999         @Override
1000         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1001             assert eventLoop().inEventLoop();
1002             if (len == 0) {
1003                 promise.setSuccess();
1004                 return true;
1005             }
1006 
1007             try {
1008                 FileDescriptor[] pipe = pipe();
1009                 FileDescriptor pipeIn = pipe[0];
1010                 FileDescriptor pipeOut = pipe[1];
1011                 try {
1012                     int splicedIn = spliceIn(pipeOut, handle);
1013                     if (splicedIn > 0) {
1014                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1015                         if (len != Integer.MAX_VALUE) {
1016                             len -= splicedIn;
1017                         }
1018                         do {
1019                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1020                             splicedIn -= splicedOut;
1021                         } while (splicedIn > 0);
1022                         if (len == 0) {
1023                             promise.setSuccess();
1024                             return true;
1025                         }
1026                     }
1027                     return false;
1028                 } finally {
1029                     safeClosePipe(pipeIn);
1030                     safeClosePipe(pipeOut);
1031                 }
1032             } catch (Throwable cause) {
1033                 promise.setFailure(cause);
1034                 return true;
1035             }
1036         }
1037     }
1038 
1039     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1040         EpollSocketWritableByteChannel() {
1041             super(socket);
1042         }
1043 
1044         @Override
1045         protected ByteBufAllocator alloc() {
1046             return AbstractEpollStreamChannel.this.alloc();
1047         }
1048     }
1049 }