View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.ThrowableUtil;
41  import io.netty.util.internal.UnstableApi;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.ClosedChannelException;
49  import java.nio.channels.WritableByteChannel;
50  import java.util.Queue;
51  import java.util.concurrent.Executor;
52  
53  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.FileDescriptor.pipe;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  
58  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60      private static final String EXPECTED_TYPES =
61              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64      private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
65              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
66                      AbstractEpollStreamChannel.class, "clearSpliceQueue()");
67      private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
68              new ClosedChannelException(),
69              AbstractEpollStreamChannel.class, "spliceTo(...)");
70      private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
71              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
72              AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
73      private final Runnable flushTask = new Runnable() {
74          @Override
75          public void run() {
76              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
77              // meantime.
78              ((AbstractEpollUnsafe) unsafe()).flush0();
79          }
80      };
81      private Queue<SpliceInTask> spliceQueue;
82  
83      // Lazy init these if we need to splice(...)
84      private FileDescriptor pipeIn;
85      private FileDescriptor pipeOut;
86  
87      private WritableByteChannel byteChannel;
88  
89      protected AbstractEpollStreamChannel(Channel parent, int fd) {
90          this(parent, new LinuxSocket(fd));
91      }
92  
93      protected AbstractEpollStreamChannel(int fd) {
94          this(new LinuxSocket(fd));
95      }
96  
97      AbstractEpollStreamChannel(LinuxSocket fd) {
98          this(fd, isSoErrorZero(fd));
99      }
100 
101     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
102         super(parent, fd, true);
103         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
104         flags |= Native.EPOLLRDHUP;
105     }
106 
107     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
108         super(parent, fd, remote);
109         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
110         flags |= Native.EPOLLRDHUP;
111     }
112 
113     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
114         super(null, fd, active);
115         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
116         flags |= Native.EPOLLRDHUP;
117     }
118 
119     @Override
120     protected AbstractEpollUnsafe newUnsafe() {
121         return new EpollStreamUnsafe();
122     }
123 
124     @Override
125     public ChannelMetadata metadata() {
126         return METADATA;
127     }
128 
129     /**
130      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
131      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
132      * splice until the {@link ChannelFuture} was canceled or it was failed.
133      *
134      * Please note:
135      * <ul>
136      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
137      *   {@link IllegalArgumentException} is thrown. </li>
138      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
139      *   target {@link AbstractEpollStreamChannel}</li>
140      * </ul>
141      *
142      */
143     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
144         return spliceTo(ch, len, newPromise());
145     }
146 
147     /**
148      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
149      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
150      * splice until the {@link ChannelFuture} was canceled or it was failed.
151      *
152      * Please note:
153      * <ul>
154      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
155      *   {@link IllegalArgumentException} is thrown. </li>
156      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
157      *   target {@link AbstractEpollStreamChannel}</li>
158      * </ul>
159      *
160      */
161     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
162                                         final ChannelPromise promise) {
163         if (ch.eventLoop() != eventLoop()) {
164             throw new IllegalArgumentException("EventLoops are not the same.");
165         }
166         if (len < 0) {
167             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
168         }
169         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
170                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
171             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
172         }
173         checkNotNull(promise, "promise");
174         if (!isOpen()) {
175             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
176         } else {
177             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
178             failSpliceIfClosed(promise);
179         }
180         return promise;
181     }
182 
183     /**
184      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
185      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
186      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
187      * {@link ChannelFuture} was canceled or it was failed.
188      *
189      * Please note:
190      * <ul>
191      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
192      *   {@link AbstractEpollStreamChannel}</li>
193      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
194      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
195      * </ul>
196      */
197     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
198         return spliceTo(ch, offset, len, newPromise());
199     }
200 
201     /**
202      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
203      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
204      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
205      * {@link ChannelFuture} was canceled or it was failed.
206      *
207      * Please note:
208      * <ul>
209      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
210      *   {@link AbstractEpollStreamChannel}</li>
211      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
212      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
213      * </ul>
214      */
215     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
216                                         final ChannelPromise promise) {
217         if (len < 0) {
218             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
219         }
220         if (offset < 0) {
221             throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
222         }
223         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
224             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
225         }
226         checkNotNull(promise, "promise");
227         if (!isOpen()) {
228             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
229         } else {
230             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
231             failSpliceIfClosed(promise);
232         }
233         return promise;
234     }
235 
236     private void failSpliceIfClosed(ChannelPromise promise) {
237         if (!isOpen()) {
238             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
239             // cases where a future may not be notified otherwise.
240             if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
241                 eventLoop().execute(new Runnable() {
242                     @Override
243                     public void run() {
244                         // Call this via the EventLoop as it is a MPSC queue.
245                         clearSpliceQueue();
246                     }
247                 });
248             }
249         }
250     }
251 
252     /**
253      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
254      * @param in the collection which contains objects to write.
255      * @param buf the {@link ByteBuf} from which the bytes should be written
256      * @return The value that should be decremented from the write quantum which starts at
257      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
258      * <ul>
259      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
260      *     is encountered</li>
261      *     <li>1 - if a single call to write data was made to the OS</li>
262      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
263      *     no data was accepted</li>
264      * </ul>
265      */
266     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
267         int readableBytes = buf.readableBytes();
268         if (readableBytes == 0) {
269             in.remove();
270             return 0;
271         }
272 
273         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
274             return doWriteBytes(in, buf);
275         } else {
276             ByteBuffer[] nioBuffers = buf.nioBuffers();
277             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
278                     config().getMaxBytesPerGatheringWrite());
279         }
280     }
281 
282     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
283         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
284         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
285         // make a best effort to adjust as OS behavior changes.
286         if (attempted == written) {
287             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
288                 config().setMaxBytesPerGatheringWrite(attempted << 1);
289             }
290         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
291             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
292         }
293     }
294 
295     /**
296      * Write multiple bytes via {@link IovArray}.
297      * @param in the collection which contains objects to write.
298      * @param array The array which contains the content to write.
299      * @return The value that should be decremented from the write quantum which starts at
300      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
301      * <ul>
302      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
303      *     is encountered</li>
304      *     <li>1 - if a single call to write data was made to the OS</li>
305      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
306      *     no data was accepted</li>
307      * </ul>
308      * @throws IOException If an I/O exception occurs during write.
309      */
310     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
311         final long expectedWrittenBytes = array.size();
312         assert expectedWrittenBytes != 0;
313         final int cnt = array.count();
314         assert cnt != 0;
315 
316         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
317         if (localWrittenBytes > 0) {
318             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
319             in.removeBytes(localWrittenBytes);
320             return 1;
321         }
322         return WRITE_STATUS_SNDBUF_FULL;
323     }
324 
325     /**
326      * Write multiple bytes via {@link ByteBuffer} array.
327      * @param in the collection which contains objects to write.
328      * @param nioBuffers The buffers to write.
329      * @param nioBufferCnt The number of buffers to write.
330      * @param expectedWrittenBytes The number of bytes we expect to write.
331      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
332      * @return The value that should be decremented from the write quantum which starts at
333      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
334      * <ul>
335      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
336      *     is encountered</li>
337      *     <li>1 - if a single call to write data was made to the OS</li>
338      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
339      *     no data was accepted</li>
340      * </ul>
341      * @throws IOException If an I/O exception occurs during write.
342      */
343     private int writeBytesMultiple(
344             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
345             long maxBytesPerGatheringWrite) throws IOException {
346         assert expectedWrittenBytes != 0;
347         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
348             expectedWrittenBytes = maxBytesPerGatheringWrite;
349         }
350 
351         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
352         if (localWrittenBytes > 0) {
353             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
354             in.removeBytes(localWrittenBytes);
355             return 1;
356         }
357         return WRITE_STATUS_SNDBUF_FULL;
358     }
359 
360     /**
361      * Write a {@link DefaultFileRegion}
362      * @param in the collection which contains objects to write.
363      * @param region the {@link DefaultFileRegion} from which the bytes should be written
364      * @return The value that should be decremented from the write quantum which starts at
365      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
366      * <ul>
367      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
368      *     is encountered</li>
369      *     <li>1 - if a single call to write data was made to the OS</li>
370      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
371      *     no data was accepted</li>
372      * </ul>
373      */
374     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
375         final long regionCount = region.count();
376         if (region.transferred() >= regionCount) {
377             in.remove();
378             return 0;
379         }
380 
381         final long offset = region.transferred();
382         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
383         if (flushedAmount > 0) {
384             in.progress(flushedAmount);
385             if (region.transferred() >= regionCount) {
386                 in.remove();
387             }
388             return 1;
389         }
390         return WRITE_STATUS_SNDBUF_FULL;
391     }
392 
393     /**
394      * Write a {@link FileRegion}
395      * @param in the collection which contains objects to write.
396      * @param region the {@link FileRegion} from which the bytes should be written
397      * @return The value that should be decremented from the write quantum which starts at
398      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
399      * <ul>
400      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
401      *     is encountered</li>
402      *     <li>1 - if a single call to write data was made to the OS</li>
403      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
404      *     no data was accepted</li>
405      * </ul>
406      */
407     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
408         if (region.transferred() >= region.count()) {
409             in.remove();
410             return 0;
411         }
412 
413         if (byteChannel == null) {
414             byteChannel = new EpollSocketWritableByteChannel();
415         }
416         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
417         if (flushedAmount > 0) {
418             in.progress(flushedAmount);
419             if (region.transferred() >= region.count()) {
420                 in.remove();
421             }
422             return 1;
423         }
424         return WRITE_STATUS_SNDBUF_FULL;
425     }
426 
427     @Override
428     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
429         int writeSpinCount = config().getWriteSpinCount();
430         do {
431             final int msgCount = in.size();
432             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
433             if (msgCount > 1 && in.current() instanceof ByteBuf) {
434                 writeSpinCount -= doWriteMultiple(in);
435             } else if (msgCount == 0) {
436                 // Wrote all messages.
437                 clearFlag(Native.EPOLLOUT);
438                 // Return here so we not set the EPOLLOUT flag.
439                 return;
440             } else {  // msgCount == 1
441                 writeSpinCount -= doWriteSingle(in);
442             }
443 
444             // We do not break the loop here even if the outbound buffer was flushed completely,
445             // because a user might have triggered another write and flush when we notify his or her
446             // listeners.
447         } while (writeSpinCount > 0);
448 
449         if (writeSpinCount == 0) {
450             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
451             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
452             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
453             // and set the EPOLLOUT if necessary.
454             clearFlag(Native.EPOLLOUT);
455 
456             // We used our writeSpin quantum, and should try to write again later.
457             eventLoop().execute(flushTask);
458         } else {
459             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
460             // when it can accept more data.
461             setFlag(Native.EPOLLOUT);
462         }
463     }
464 
465     /**
466      * Attempt to write a single object.
467      * @param in the collection which contains objects to write.
468      * @return The value that should be decremented from the write quantum which starts at
469      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
470      * <ul>
471      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
472      *     is encountered</li>
473      *     <li>1 - if a single call to write data was made to the OS</li>
474      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
475      *     no data was accepted</li>
476      * </ul>
477      * @throws Exception If an I/O error occurs.
478      */
479     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
480         // The outbound buffer contains only one message or it contains a file region.
481         Object msg = in.current();
482         if (msg instanceof ByteBuf) {
483             return writeBytes(in, (ByteBuf) msg);
484         } else if (msg instanceof DefaultFileRegion) {
485             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
486         } else if (msg instanceof FileRegion) {
487             return writeFileRegion(in, (FileRegion) msg);
488         } else if (msg instanceof SpliceOutTask) {
489             if (!((SpliceOutTask) msg).spliceOut()) {
490                 return WRITE_STATUS_SNDBUF_FULL;
491             }
492             in.remove();
493             return 1;
494         } else {
495             // Should never reach here.
496             throw new Error();
497         }
498     }
499 
500     /**
501      * Attempt to write multiple {@link ByteBuf} objects.
502      * @param in the collection which contains objects to write.
503      * @return The value that should be decremented from the write quantum which starts at
504      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
505      * <ul>
506      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
507      *     is encountered</li>
508      *     <li>1 - if a single call to write data was made to the OS</li>
509      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
510      *     no data was accepted</li>
511      * </ul>
512      * @throws Exception If an I/O error occurs.
513      */
514     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
515         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
516         IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
517         array.maxBytes(maxBytesPerGatheringWrite);
518         in.forEachFlushedMessage(array);
519 
520         if (array.count() >= 1) {
521             // TODO: Handle the case where cnt == 1 specially.
522             return writeBytesMultiple(in, array);
523         }
524         // cnt == 0, which means the outbound buffer contained empty buffers only.
525         in.removeBytes(0);
526         return 0;
527     }
528 
529     @Override
530     protected Object filterOutboundMessage(Object msg) {
531         if (msg instanceof ByteBuf) {
532             ByteBuf buf = (ByteBuf) msg;
533             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
534         }
535 
536         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
537             return msg;
538         }
539 
540         throw new UnsupportedOperationException(
541                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
542     }
543 
544     @UnstableApi
545     @Override
546     protected final void doShutdownOutput() throws Exception {
547         socket.shutdown(false, true);
548     }
549 
550     private void shutdownInput0(final ChannelPromise promise) {
551         try {
552             socket.shutdown(true, false);
553             promise.setSuccess();
554         } catch (Throwable cause) {
555             promise.setFailure(cause);
556         }
557     }
558 
559     @Override
560     public boolean isOutputShutdown() {
561         return socket.isOutputShutdown();
562     }
563 
564     @Override
565     public boolean isInputShutdown() {
566         return socket.isInputShutdown();
567     }
568 
569     @Override
570     public boolean isShutdown() {
571         return socket.isShutdown();
572     }
573 
574     @Override
575     public ChannelFuture shutdownOutput() {
576         return shutdownOutput(newPromise());
577     }
578 
579     @Override
580     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
581         EventLoop loop = eventLoop();
582         if (loop.inEventLoop()) {
583             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
584         } else {
585             loop.execute(new Runnable() {
586                 @Override
587                 public void run() {
588                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
589                 }
590             });
591         }
592 
593         return promise;
594     }
595 
596     @Override
597     public ChannelFuture shutdownInput() {
598         return shutdownInput(newPromise());
599     }
600 
601     @Override
602     public ChannelFuture shutdownInput(final ChannelPromise promise) {
603         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
604         if (closeExecutor != null) {
605             closeExecutor.execute(new Runnable() {
606                 @Override
607                 public void run() {
608                     shutdownInput0(promise);
609                 }
610             });
611         } else {
612             EventLoop loop = eventLoop();
613             if (loop.inEventLoop()) {
614                 shutdownInput0(promise);
615             } else {
616                 loop.execute(new Runnable() {
617                     @Override
618                     public void run() {
619                         shutdownInput0(promise);
620                     }
621                 });
622             }
623         }
624         return promise;
625     }
626 
627     @Override
628     public ChannelFuture shutdown() {
629         return shutdown(newPromise());
630     }
631 
632     @Override
633     public ChannelFuture shutdown(final ChannelPromise promise) {
634         ChannelFuture shutdownOutputFuture = shutdownOutput();
635         if (shutdownOutputFuture.isDone()) {
636             shutdownOutputDone(shutdownOutputFuture, promise);
637         } else {
638             shutdownOutputFuture.addListener(new ChannelFutureListener() {
639                 @Override
640                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
641                     shutdownOutputDone(shutdownOutputFuture, promise);
642                 }
643             });
644         }
645         return promise;
646     }
647 
648     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
649         ChannelFuture shutdownInputFuture = shutdownInput();
650         if (shutdownInputFuture.isDone()) {
651             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
652         } else {
653             shutdownInputFuture.addListener(new ChannelFutureListener() {
654                 @Override
655                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
656                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
657                 }
658             });
659         }
660     }
661 
662     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
663                               ChannelFuture shutdownInputFuture,
664                               ChannelPromise promise) {
665         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
666         Throwable shutdownInputCause = shutdownInputFuture.cause();
667         if (shutdownOutputCause != null) {
668             if (shutdownInputCause != null) {
669                 logger.debug("Exception suppressed because a previous exception occurred.",
670                         shutdownInputCause);
671             }
672             promise.setFailure(shutdownOutputCause);
673         } else if (shutdownInputCause != null) {
674             promise.setFailure(shutdownInputCause);
675         } else {
676             promise.setSuccess();
677         }
678     }
679 
680     @Override
681     protected void doClose() throws Exception {
682         try {
683             // Calling super.doClose() first so spliceTo(...) will fail on next call.
684             super.doClose();
685         } finally {
686             safeClosePipe(pipeIn);
687             safeClosePipe(pipeOut);
688             clearSpliceQueue();
689         }
690     }
691 
692     private void clearSpliceQueue() {
693         if (spliceQueue == null) {
694             return;
695         }
696         for (;;) {
697             SpliceInTask task = spliceQueue.poll();
698             if (task == null) {
699                 break;
700             }
701             task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
702         }
703     }
704 
705     private static void safeClosePipe(FileDescriptor fd) {
706         if (fd != null) {
707             try {
708                 fd.close();
709             } catch (IOException e) {
710                 if (logger.isWarnEnabled()) {
711                     logger.warn("Error while closing a pipe", e);
712                 }
713             }
714         }
715     }
716 
717     class EpollStreamUnsafe extends AbstractEpollUnsafe {
718         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
719         @Override
720         protected Executor prepareToClose() {
721             return super.prepareToClose();
722         }
723 
724         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
725                 EpollRecvByteAllocatorHandle allocHandle) {
726             if (byteBuf != null) {
727                 if (byteBuf.isReadable()) {
728                     readPending = false;
729                     pipeline.fireChannelRead(byteBuf);
730                 } else {
731                     byteBuf.release();
732                 }
733             }
734             allocHandle.readComplete();
735             pipeline.fireChannelReadComplete();
736             pipeline.fireExceptionCaught(cause);
737             if (close || cause instanceof IOException) {
738                 shutdownInput(false);
739             }
740         }
741 
742         @Override
743         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
744             return new EpollRecvByteAllocatorStreamingHandle(handle);
745         }
746 
747         @Override
748         void epollInReady() {
749             final ChannelConfig config = config();
750             if (shouldBreakEpollInReady(config)) {
751                 clearEpollIn0();
752                 return;
753             }
754             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
755             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
756 
757             final ChannelPipeline pipeline = pipeline();
758             final ByteBufAllocator allocator = config.getAllocator();
759             allocHandle.reset(config);
760             epollInBefore();
761 
762             ByteBuf byteBuf = null;
763             boolean close = false;
764             try {
765                 do {
766                     if (spliceQueue != null) {
767                         SpliceInTask spliceTask = spliceQueue.peek();
768                         if (spliceTask != null) {
769                             if (spliceTask.spliceIn(allocHandle)) {
770                                 // We need to check if it is still active as if not we removed all SpliceTasks in
771                                 // doClose(...)
772                                 if (isActive()) {
773                                     spliceQueue.remove();
774                                 }
775                                 continue;
776                             } else {
777                                 break;
778                             }
779                         }
780                     }
781 
782                     // we use a direct buffer here as the native implementations only be able
783                     // to handle direct buffers.
784                     byteBuf = allocHandle.allocate(allocator);
785                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
786                     if (allocHandle.lastBytesRead() <= 0) {
787                         // nothing was read, release the buffer.
788                         byteBuf.release();
789                         byteBuf = null;
790                         close = allocHandle.lastBytesRead() < 0;
791                         if (close) {
792                             // There is nothing left to read as we received an EOF.
793                             readPending = false;
794                         }
795                         break;
796                     }
797                     allocHandle.incMessagesRead(1);
798                     readPending = false;
799                     pipeline.fireChannelRead(byteBuf);
800                     byteBuf = null;
801 
802                     if (shouldBreakEpollInReady(config)) {
803                         // We need to do this for two reasons:
804                         //
805                         // - If the input was shutdown in between (which may be the case when the user did it in the
806                         //   fireChannelRead(...) method we should not try to read again to not produce any
807                         //   miss-leading exceptions.
808                         //
809                         // - If the user closes the channel we need to ensure we not try to read from it again as
810                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
811                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
812                         //   reading data from a filedescriptor that belongs to another socket then the socket that
813                         //   was "wrapped" by this Channel implementation.
814                         break;
815                     }
816                 } while (allocHandle.continueReading());
817 
818                 allocHandle.readComplete();
819                 pipeline.fireChannelReadComplete();
820 
821                 if (close) {
822                     shutdownInput(false);
823                 }
824             } catch (Throwable t) {
825                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
826             } finally {
827                 epollInFinally(config);
828             }
829         }
830     }
831 
832     private void addToSpliceQueue(final SpliceInTask task) {
833         EventLoop eventLoop = eventLoop();
834         if (eventLoop.inEventLoop()) {
835             addToSpliceQueue0(task);
836         } else {
837             eventLoop.execute(new Runnable() {
838                 @Override
839                 public void run() {
840                     addToSpliceQueue0(task);
841                 }
842             });
843         }
844     }
845 
846     private void addToSpliceQueue0(SpliceInTask task) {
847         if (spliceQueue == null) {
848             spliceQueue = PlatformDependent.newMpscQueue();
849         }
850         spliceQueue.add(task);
851     }
852 
853     protected abstract class SpliceInTask {
854         final ChannelPromise promise;
855         int len;
856 
857         protected SpliceInTask(int len, ChannelPromise promise) {
858             this.promise = promise;
859             this.len = len;
860         }
861 
862         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
863 
864         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
865             // calculate the maximum amount of data we are allowed to splice
866             int length = Math.min(handle.guess(), len);
867             int splicedIn = 0;
868             for (;;) {
869                 // Splicing until there is nothing left to splice.
870                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
871                 if (localSplicedIn == 0) {
872                     break;
873                 }
874                 splicedIn += localSplicedIn;
875                 length -= localSplicedIn;
876             }
877 
878             return splicedIn;
879         }
880     }
881 
882     // Let it directly implement channelFutureListener as well to reduce object creation.
883     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
884         private final AbstractEpollStreamChannel ch;
885 
886         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
887             super(len, promise);
888             this.ch = ch;
889         }
890 
891         @Override
892         public void operationComplete(ChannelFuture future) throws Exception {
893             if (!future.isSuccess()) {
894                 promise.setFailure(future.cause());
895             }
896         }
897 
898         @Override
899         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
900             assert ch.eventLoop().inEventLoop();
901             if (len == 0) {
902                 promise.setSuccess();
903                 return true;
904             }
905             try {
906                 // We create the pipe on the target channel as this will allow us to just handle pending writes
907                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
908                 // on multiple Channels pointing to one target Channel.
909                 FileDescriptor pipeOut = ch.pipeOut;
910                 if (pipeOut == null) {
911                     // Create a new pipe as non was created before.
912                     FileDescriptor[] pipe = pipe();
913                     ch.pipeIn = pipe[0];
914                     pipeOut = ch.pipeOut = pipe[1];
915                 }
916 
917                 int splicedIn = spliceIn(pipeOut, handle);
918                 if (splicedIn > 0) {
919                     // Integer.MAX_VALUE is a special value which will result in splice forever.
920                     if (len != Integer.MAX_VALUE) {
921                         len -= splicedIn;
922                     }
923 
924                     // Depending on if we are done with splicing inbound data we set the right promise for the
925                     // outbound splicing.
926                     final ChannelPromise splicePromise;
927                     if (len == 0) {
928                         splicePromise = promise;
929                     } else {
930                         splicePromise = ch.newPromise().addListener(this);
931                     }
932 
933                     boolean autoRead = config().isAutoRead();
934 
935                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
936                     // case.
937                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
938                     ch.unsafe().flush();
939                     if (autoRead && !splicePromise.isDone()) {
940                         // Write was not done which means the target channel was not writable. In this case we need to
941                         // disable reading until we are done with splicing to the target channel because:
942                         //
943                         // - The user may want to to trigger another splice operation once the splicing was complete.
944                         config().setAutoRead(false);
945                     }
946                 }
947 
948                 return len == 0;
949             } catch (Throwable cause) {
950                 promise.setFailure(cause);
951                 return true;
952             }
953         }
954     }
955 
956     private final class SpliceOutTask {
957         private final AbstractEpollStreamChannel ch;
958         private final boolean autoRead;
959         private int len;
960 
961         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
962             this.ch = ch;
963             this.len = len;
964             this.autoRead = autoRead;
965         }
966 
967         public boolean spliceOut() throws Exception {
968             assert ch.eventLoop().inEventLoop();
969             try {
970                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
971                 len -= splicedOut;
972                 if (len == 0) {
973                     if (autoRead) {
974                         // AutoRead was used and we spliced everything so start reading again
975                         config().setAutoRead(true);
976                     }
977                     return true;
978                 }
979                 return false;
980             } catch (IOException e) {
981                 if (autoRead) {
982                     // AutoRead was used and we spliced everything so start reading again
983                     config().setAutoRead(true);
984                 }
985                 throw e;
986             }
987         }
988     }
989 
990     private final class SpliceFdTask extends SpliceInTask {
991         private final FileDescriptor fd;
992         private final ChannelPromise promise;
993         private final int offset;
994 
995         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
996             super(len, promise);
997             this.fd = fd;
998             this.promise = promise;
999             this.offset = offset;
1000         }
1001 
1002         @Override
1003         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1004             assert eventLoop().inEventLoop();
1005             if (len == 0) {
1006                 promise.setSuccess();
1007                 return true;
1008             }
1009 
1010             try {
1011                 FileDescriptor[] pipe = pipe();
1012                 FileDescriptor pipeIn = pipe[0];
1013                 FileDescriptor pipeOut = pipe[1];
1014                 try {
1015                     int splicedIn = spliceIn(pipeOut, handle);
1016                     if (splicedIn > 0) {
1017                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1018                         if (len != Integer.MAX_VALUE) {
1019                             len -= splicedIn;
1020                         }
1021                         do {
1022                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1023                             splicedIn -= splicedOut;
1024                         } while (splicedIn > 0);
1025                         if (len == 0) {
1026                             promise.setSuccess();
1027                             return true;
1028                         }
1029                     }
1030                     return false;
1031                 } finally {
1032                     safeClosePipe(pipeIn);
1033                     safeClosePipe(pipeOut);
1034                 }
1035             } catch (Throwable cause) {
1036                 promise.setFailure(cause);
1037                 return true;
1038             }
1039         }
1040     }
1041 
1042     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1043         EpollSocketWritableByteChannel() {
1044             super(socket);
1045         }
1046 
1047         @Override
1048         protected ByteBufAllocator alloc() {
1049             return AbstractEpollStreamChannel.this.alloc();
1050         }
1051     }
1052 }