View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.ThrowableUtil;
41  import io.netty.util.internal.UnstableApi;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.ClosedChannelException;
49  import java.nio.channels.WritableByteChannel;
50  import java.util.Queue;
51  import java.util.concurrent.Executor;
52  
53  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.FileDescriptor.pipe;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  
58  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60      private static final String EXPECTED_TYPES =
61              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64      private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
65              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
66                      AbstractEpollStreamChannel.class, "clearSpliceQueue()");
67      private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
68              new ClosedChannelException(),
69              AbstractEpollStreamChannel.class, "spliceTo(...)");
70      private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
71              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
72              AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
73      private final Runnable flushTask = new Runnable() {
74          @Override
75          public void run() {
76              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
77              // meantime.
78              ((AbstractEpollUnsafe) unsafe()).flush0();
79          }
80      };
81      private Queue<SpliceInTask> spliceQueue;
82  
83      // Lazy init these if we need to splice(...)
84      private FileDescriptor pipeIn;
85      private FileDescriptor pipeOut;
86  
87      private WritableByteChannel byteChannel;
88  
89      protected AbstractEpollStreamChannel(Channel parent, int fd) {
90          this(parent, new LinuxSocket(fd));
91      }
92  
93      protected AbstractEpollStreamChannel(int fd) {
94          this(new LinuxSocket(fd));
95      }
96  
97      AbstractEpollStreamChannel(LinuxSocket fd) {
98          this(fd, isSoErrorZero(fd));
99      }
100 
101     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
102         super(parent, fd, true);
103         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
104         flags |= Native.EPOLLRDHUP;
105     }
106 
107     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
108         super(parent, fd, remote);
109         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
110         flags |= Native.EPOLLRDHUP;
111     }
112 
113     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
114         super(null, fd, active);
115         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
116         flags |= Native.EPOLLRDHUP;
117     }
118 
119     @Override
120     protected AbstractEpollUnsafe newUnsafe() {
121         return new EpollStreamUnsafe();
122     }
123 
124     @Override
125     public ChannelMetadata metadata() {
126         return METADATA;
127     }
128 
129     /**
130      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
131      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
132      * splice until the {@link ChannelFuture} was canceled or it was failed.
133      *
134      * Please note:
135      * <ul>
136      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
137      *   {@link IllegalArgumentException} is thrown. </li>
138      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
139      *   target {@link AbstractEpollStreamChannel}</li>
140      * </ul>
141      *
142      */
143     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
144         return spliceTo(ch, len, newPromise());
145     }
146 
147     /**
148      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
149      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
150      * splice until the {@link ChannelFuture} was canceled or it was failed.
151      *
152      * Please note:
153      * <ul>
154      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
155      *   {@link IllegalArgumentException} is thrown. </li>
156      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
157      *   target {@link AbstractEpollStreamChannel}</li>
158      * </ul>
159      *
160      */
161     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
162                                         final ChannelPromise promise) {
163         if (ch.eventLoop() != eventLoop()) {
164             throw new IllegalArgumentException("EventLoops are not the same.");
165         }
166         if (len < 0) {
167             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
168         }
169         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
170                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
171             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
172         }
173         checkNotNull(promise, "promise");
174         if (!isOpen()) {
175             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
176         } else {
177             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
178             failSpliceIfClosed(promise);
179         }
180         return promise;
181     }
182 
183     /**
184      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
185      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
186      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
187      * {@link ChannelFuture} was canceled or it was failed.
188      *
189      * Please note:
190      * <ul>
191      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
192      *   {@link AbstractEpollStreamChannel}</li>
193      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
194      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
195      * </ul>
196      */
197     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
198         return spliceTo(ch, offset, len, newPromise());
199     }
200 
201     /**
202      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
203      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
204      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
205      * {@link ChannelFuture} was canceled or it was failed.
206      *
207      * Please note:
208      * <ul>
209      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
210      *   {@link AbstractEpollStreamChannel}</li>
211      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
212      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
213      * </ul>
214      */
215     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
216                                         final ChannelPromise promise) {
217         if (len < 0) {
218             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
219         }
220         if (offset < 0) {
221             throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
222         }
223         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
224             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
225         }
226         checkNotNull(promise, "promise");
227         if (!isOpen()) {
228             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
229         } else {
230             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
231             failSpliceIfClosed(promise);
232         }
233         return promise;
234     }
235 
236     private void failSpliceIfClosed(ChannelPromise promise) {
237         if (!isOpen()) {
238             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
239             // cases where a future may not be notified otherwise.
240             if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
241                 eventLoop().execute(new Runnable() {
242                     @Override
243                     public void run() {
244                         // Call this via the EventLoop as it is a MPSC queue.
245                         clearSpliceQueue();
246                     }
247                 });
248             }
249         }
250     }
251 
252     /**
253      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
254      * @param in the collection which contains objects to write.
255      * @param buf the {@link ByteBuf} from which the bytes should be written
256      * @return The value that should be decremented from the write quantum which starts at
257      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
258      * <ul>
259      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
260      *     is encountered</li>
261      *     <li>1 - if a single call to write data was made to the OS</li>
262      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
263      *     no data was accepted</li>
264      * </ul>
265      */
266     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
267         int readableBytes = buf.readableBytes();
268         if (readableBytes == 0) {
269             in.remove();
270             return 0;
271         }
272 
273         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
274             return doWriteBytes(in, buf);
275         } else {
276             ByteBuffer[] nioBuffers = buf.nioBuffers();
277             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
278                     config().getMaxBytesPerGatheringWrite());
279         }
280     }
281 
282     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
283         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
284         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
285         // make a best effort to adjust as OS behavior changes.
286         if (attempted == written) {
287             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
288                 config().setMaxBytesPerGatheringWrite(attempted << 1);
289             }
290         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
291             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
292         }
293     }
294 
295     /**
296      * Write multiple bytes via {@link IovArray}.
297      * @param in the collection which contains objects to write.
298      * @param array The array which contains the content to write.
299      * @return The value that should be decremented from the write quantum which starts at
300      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
301      * <ul>
302      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
303      *     is encountered</li>
304      *     <li>1 - if a single call to write data was made to the OS</li>
305      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
306      *     no data was accepted</li>
307      * </ul>
308      * @throws IOException If an I/O exception occurs during write.
309      */
310     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
311         final long expectedWrittenBytes = array.size();
312         assert expectedWrittenBytes != 0;
313         final int cnt = array.count();
314         assert cnt != 0;
315 
316         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
317         if (localWrittenBytes > 0) {
318             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
319             in.removeBytes(localWrittenBytes);
320             return 1;
321         }
322         return WRITE_STATUS_SNDBUF_FULL;
323     }
324 
325     /**
326      * Write multiple bytes via {@link ByteBuffer} array.
327      * @param in the collection which contains objects to write.
328      * @param nioBuffers The buffers to write.
329      * @param nioBufferCnt The number of buffers to write.
330      * @param expectedWrittenBytes The number of bytes we expect to write.
331      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
332      * @return The value that should be decremented from the write quantum which starts at
333      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
334      * <ul>
335      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
336      *     is encountered</li>
337      *     <li>1 - if a single call to write data was made to the OS</li>
338      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
339      *     no data was accepted</li>
340      * </ul>
341      * @throws IOException If an I/O exception occurs during write.
342      */
343     private int writeBytesMultiple(
344             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
345             long maxBytesPerGatheringWrite) throws IOException {
346         assert expectedWrittenBytes != 0;
347         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
348             expectedWrittenBytes = maxBytesPerGatheringWrite;
349         }
350 
351         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
352         if (localWrittenBytes > 0) {
353             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
354             in.removeBytes(localWrittenBytes);
355             return 1;
356         }
357         return WRITE_STATUS_SNDBUF_FULL;
358     }
359 
360     /**
361      * Write a {@link DefaultFileRegion}
362      * @param in the collection which contains objects to write.
363      * @param region the {@link DefaultFileRegion} from which the bytes should be written
364      * @return The value that should be decremented from the write quantum which starts at
365      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
366      * <ul>
367      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
368      *     is encountered</li>
369      *     <li>1 - if a single call to write data was made to the OS</li>
370      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
371      *     no data was accepted</li>
372      * </ul>
373      */
374     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
375         final long regionCount = region.count();
376         if (region.transferred() >= regionCount) {
377             in.remove();
378             return 0;
379         }
380 
381         final long offset = region.transferred();
382         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
383         if (flushedAmount > 0) {
384             in.progress(flushedAmount);
385             if (region.transferred() >= regionCount) {
386                 in.remove();
387             }
388             return 1;
389         }
390         return WRITE_STATUS_SNDBUF_FULL;
391     }
392 
393     /**
394      * Write a {@link FileRegion}
395      * @param in the collection which contains objects to write.
396      * @param region the {@link FileRegion} from which the bytes should be written
397      * @return The value that should be decremented from the write quantum which starts at
398      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
399      * <ul>
400      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
401      *     is encountered</li>
402      *     <li>1 - if a single call to write data was made to the OS</li>
403      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
404      *     no data was accepted</li>
405      * </ul>
406      */
407     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
408         if (region.transferred() >= region.count()) {
409             in.remove();
410             return 0;
411         }
412 
413         if (byteChannel == null) {
414             byteChannel = new EpollSocketWritableByteChannel();
415         }
416         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
417         if (flushedAmount > 0) {
418             in.progress(flushedAmount);
419             if (region.transferred() >= region.count()) {
420                 in.remove();
421             }
422             return 1;
423         }
424         return WRITE_STATUS_SNDBUF_FULL;
425     }
426 
427     @Override
428     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
429         int writeSpinCount = config().getWriteSpinCount();
430         do {
431             final int msgCount = in.size();
432             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
433             if (msgCount > 1 && in.current() instanceof ByteBuf) {
434                 writeSpinCount -= doWriteMultiple(in);
435             } else if (msgCount == 0) {
436                 // Wrote all messages.
437                 clearFlag(Native.EPOLLOUT);
438                 // Return here so we not set the EPOLLOUT flag.
439                 return;
440             } else {  // msgCount == 1
441                 writeSpinCount -= doWriteSingle(in);
442             }
443 
444             // We do not break the loop here even if the outbound buffer was flushed completely,
445             // because a user might have triggered another write and flush when we notify his or her
446             // listeners.
447         } while (writeSpinCount > 0);
448 
449         if (writeSpinCount == 0) {
450             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
451             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
452             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
453             // and set the EPOLLOUT if necessary.
454             clearFlag(Native.EPOLLOUT);
455 
456             // We used our writeSpin quantum, and should try to write again later.
457             eventLoop().execute(flushTask);
458         } else {
459             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
460             // when it can accept more data.
461             setFlag(Native.EPOLLOUT);
462         }
463     }
464 
465     /**
466      * Attempt to write a single object.
467      * @param in the collection which contains objects to write.
468      * @return The value that should be decremented from the write quantum which starts at
469      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
470      * <ul>
471      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
472      *     is encountered</li>
473      *     <li>1 - if a single call to write data was made to the OS</li>
474      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
475      *     no data was accepted</li>
476      * </ul>
477      * @throws Exception If an I/O error occurs.
478      */
479     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
480         // The outbound buffer contains only one message or it contains a file region.
481         Object msg = in.current();
482         if (msg instanceof ByteBuf) {
483             return writeBytes(in, (ByteBuf) msg);
484         } else if (msg instanceof DefaultFileRegion) {
485             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
486         } else if (msg instanceof FileRegion) {
487             return writeFileRegion(in, (FileRegion) msg);
488         } else if (msg instanceof SpliceOutTask) {
489             if (!((SpliceOutTask) msg).spliceOut()) {
490                 return WRITE_STATUS_SNDBUF_FULL;
491             }
492             in.remove();
493             return 1;
494         } else {
495             // Should never reach here.
496             throw new Error();
497         }
498     }
499 
500     /**
501      * Attempt to write multiple {@link ByteBuf} objects.
502      * @param in the collection which contains objects to write.
503      * @return The value that should be decremented from the write quantum which starts at
504      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
505      * <ul>
506      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
507      *     is encountered</li>
508      *     <li>1 - if a single call to write data was made to the OS</li>
509      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
510      *     no data was accepted</li>
511      * </ul>
512      * @throws Exception If an I/O error occurs.
513      */
514     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
515         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
516         if (PlatformDependent.hasUnsafe()) {
517             IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
518             array.maxBytes(maxBytesPerGatheringWrite);
519             in.forEachFlushedMessage(array);
520 
521             if (array.count() >= 1) {
522                 // TODO: Handle the case where cnt == 1 specially.
523                 return writeBytesMultiple(in, array);
524             }
525         } else {
526             ByteBuffer[] buffers = in.nioBuffers();
527             int cnt = in.nioBufferCount();
528             if (cnt >= 1) {
529                 // TODO: Handle the case where cnt == 1 specially.
530                 return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
531             }
532         }
533         // cnt == 0, which means the outbound buffer contained empty buffers only.
534         in.removeBytes(0);
535         return 0;
536     }
537 
538     @Override
539     protected Object filterOutboundMessage(Object msg) {
540         if (msg instanceof ByteBuf) {
541             ByteBuf buf = (ByteBuf) msg;
542             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
543         }
544 
545         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
546             return msg;
547         }
548 
549         throw new UnsupportedOperationException(
550                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
551     }
552 
553     @UnstableApi
554     @Override
555     protected final void doShutdownOutput() throws Exception {
556         socket.shutdown(false, true);
557     }
558 
559     private void shutdownInput0(final ChannelPromise promise) {
560         try {
561             socket.shutdown(true, false);
562             promise.setSuccess();
563         } catch (Throwable cause) {
564             promise.setFailure(cause);
565         }
566     }
567 
568     @Override
569     public boolean isOutputShutdown() {
570         return socket.isOutputShutdown();
571     }
572 
573     @Override
574     public boolean isInputShutdown() {
575         return socket.isInputShutdown();
576     }
577 
578     @Override
579     public boolean isShutdown() {
580         return socket.isShutdown();
581     }
582 
583     @Override
584     public ChannelFuture shutdownOutput() {
585         return shutdownOutput(newPromise());
586     }
587 
588     @Override
589     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
590         EventLoop loop = eventLoop();
591         if (loop.inEventLoop()) {
592             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
593         } else {
594             loop.execute(new Runnable() {
595                 @Override
596                 public void run() {
597                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
598                 }
599             });
600         }
601 
602         return promise;
603     }
604 
605     @Override
606     public ChannelFuture shutdownInput() {
607         return shutdownInput(newPromise());
608     }
609 
610     @Override
611     public ChannelFuture shutdownInput(final ChannelPromise promise) {
612         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
613         if (closeExecutor != null) {
614             closeExecutor.execute(new Runnable() {
615                 @Override
616                 public void run() {
617                     shutdownInput0(promise);
618                 }
619             });
620         } else {
621             EventLoop loop = eventLoop();
622             if (loop.inEventLoop()) {
623                 shutdownInput0(promise);
624             } else {
625                 loop.execute(new Runnable() {
626                     @Override
627                     public void run() {
628                         shutdownInput0(promise);
629                     }
630                 });
631             }
632         }
633         return promise;
634     }
635 
636     @Override
637     public ChannelFuture shutdown() {
638         return shutdown(newPromise());
639     }
640 
641     @Override
642     public ChannelFuture shutdown(final ChannelPromise promise) {
643         ChannelFuture shutdownOutputFuture = shutdownOutput();
644         if (shutdownOutputFuture.isDone()) {
645             shutdownOutputDone(shutdownOutputFuture, promise);
646         } else {
647             shutdownOutputFuture.addListener(new ChannelFutureListener() {
648                 @Override
649                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
650                     shutdownOutputDone(shutdownOutputFuture, promise);
651                 }
652             });
653         }
654         return promise;
655     }
656 
657     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
658         ChannelFuture shutdownInputFuture = shutdownInput();
659         if (shutdownInputFuture.isDone()) {
660             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
661         } else {
662             shutdownInputFuture.addListener(new ChannelFutureListener() {
663                 @Override
664                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
665                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
666                 }
667             });
668         }
669     }
670 
671     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
672                               ChannelFuture shutdownInputFuture,
673                               ChannelPromise promise) {
674         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
675         Throwable shutdownInputCause = shutdownInputFuture.cause();
676         if (shutdownOutputCause != null) {
677             if (shutdownInputCause != null) {
678                 logger.debug("Exception suppressed because a previous exception occurred.",
679                         shutdownInputCause);
680             }
681             promise.setFailure(shutdownOutputCause);
682         } else if (shutdownInputCause != null) {
683             promise.setFailure(shutdownInputCause);
684         } else {
685             promise.setSuccess();
686         }
687     }
688 
689     @Override
690     protected void doClose() throws Exception {
691         try {
692             // Calling super.doClose() first so spliceTo(...) will fail on next call.
693             super.doClose();
694         } finally {
695             safeClosePipe(pipeIn);
696             safeClosePipe(pipeOut);
697             clearSpliceQueue();
698         }
699     }
700 
701     private void clearSpliceQueue() {
702         if (spliceQueue == null) {
703             return;
704         }
705         for (;;) {
706             SpliceInTask task = spliceQueue.poll();
707             if (task == null) {
708                 break;
709             }
710             task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
711         }
712     }
713 
714     private static void safeClosePipe(FileDescriptor fd) {
715         if (fd != null) {
716             try {
717                 fd.close();
718             } catch (IOException e) {
719                 if (logger.isWarnEnabled()) {
720                     logger.warn("Error while closing a pipe", e);
721                 }
722             }
723         }
724     }
725 
726     class EpollStreamUnsafe extends AbstractEpollUnsafe {
727         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
728         @Override
729         protected Executor prepareToClose() {
730             return super.prepareToClose();
731         }
732 
733         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
734                 EpollRecvByteAllocatorHandle allocHandle) {
735             if (byteBuf != null) {
736                 if (byteBuf.isReadable()) {
737                     readPending = false;
738                     pipeline.fireChannelRead(byteBuf);
739                 } else {
740                     byteBuf.release();
741                 }
742             }
743             allocHandle.readComplete();
744             pipeline.fireChannelReadComplete();
745             pipeline.fireExceptionCaught(cause);
746             if (close || cause instanceof IOException) {
747                 shutdownInput(false);
748             }
749         }
750 
751         @Override
752         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
753             return new EpollRecvByteAllocatorStreamingHandle(handle);
754         }
755 
756         @Override
757         void epollInReady() {
758             final ChannelConfig config = config();
759             if (shouldBreakEpollInReady(config)) {
760                 clearEpollIn0();
761                 return;
762             }
763             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
764             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
765 
766             final ChannelPipeline pipeline = pipeline();
767             final ByteBufAllocator allocator = config.getAllocator();
768             allocHandle.reset(config);
769             epollInBefore();
770 
771             ByteBuf byteBuf = null;
772             boolean close = false;
773             try {
774                 do {
775                     if (spliceQueue != null) {
776                         SpliceInTask spliceTask = spliceQueue.peek();
777                         if (spliceTask != null) {
778                             if (spliceTask.spliceIn(allocHandle)) {
779                                 // We need to check if it is still active as if not we removed all SpliceTasks in
780                                 // doClose(...)
781                                 if (isActive()) {
782                                     spliceQueue.remove();
783                                 }
784                                 continue;
785                             } else {
786                                 break;
787                             }
788                         }
789                     }
790 
791                     // we use a direct buffer here as the native implementations only be able
792                     // to handle direct buffers.
793                     byteBuf = allocHandle.allocate(allocator);
794                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
795                     if (allocHandle.lastBytesRead() <= 0) {
796                         // nothing was read, release the buffer.
797                         byteBuf.release();
798                         byteBuf = null;
799                         close = allocHandle.lastBytesRead() < 0;
800                         if (close) {
801                             // There is nothing left to read as we received an EOF.
802                             readPending = false;
803                         }
804                         break;
805                     }
806                     allocHandle.incMessagesRead(1);
807                     readPending = false;
808                     pipeline.fireChannelRead(byteBuf);
809                     byteBuf = null;
810 
811                     if (shouldBreakEpollInReady(config)) {
812                         // We need to do this for two reasons:
813                         //
814                         // - If the input was shutdown in between (which may be the case when the user did it in the
815                         //   fireChannelRead(...) method we should not try to read again to not produce any
816                         //   miss-leading exceptions.
817                         //
818                         // - If the user closes the channel we need to ensure we not try to read from it again as
819                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
820                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
821                         //   reading data from a filedescriptor that belongs to another socket then the socket that
822                         //   was "wrapped" by this Channel implementation.
823                         break;
824                     }
825                 } while (allocHandle.continueReading());
826 
827                 allocHandle.readComplete();
828                 pipeline.fireChannelReadComplete();
829 
830                 if (close) {
831                     shutdownInput(false);
832                 }
833             } catch (Throwable t) {
834                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
835             } finally {
836                 epollInFinally(config);
837             }
838         }
839     }
840 
841     private void addToSpliceQueue(final SpliceInTask task) {
842         EventLoop eventLoop = eventLoop();
843         if (eventLoop.inEventLoop()) {
844             addToSpliceQueue0(task);
845         } else {
846             eventLoop.execute(new Runnable() {
847                 @Override
848                 public void run() {
849                     addToSpliceQueue0(task);
850                 }
851             });
852         }
853     }
854 
855     private void addToSpliceQueue0(SpliceInTask task) {
856         if (spliceQueue == null) {
857             spliceQueue = PlatformDependent.newMpscQueue();
858         }
859         spliceQueue.add(task);
860     }
861 
862     protected abstract class SpliceInTask {
863         final ChannelPromise promise;
864         int len;
865 
866         protected SpliceInTask(int len, ChannelPromise promise) {
867             this.promise = promise;
868             this.len = len;
869         }
870 
871         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
872 
873         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
874             // calculate the maximum amount of data we are allowed to splice
875             int length = Math.min(handle.guess(), len);
876             int splicedIn = 0;
877             for (;;) {
878                 // Splicing until there is nothing left to splice.
879                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
880                 if (localSplicedIn == 0) {
881                     break;
882                 }
883                 splicedIn += localSplicedIn;
884                 length -= localSplicedIn;
885             }
886 
887             return splicedIn;
888         }
889     }
890 
891     // Let it directly implement channelFutureListener as well to reduce object creation.
892     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
893         private final AbstractEpollStreamChannel ch;
894 
895         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
896             super(len, promise);
897             this.ch = ch;
898         }
899 
900         @Override
901         public void operationComplete(ChannelFuture future) throws Exception {
902             if (!future.isSuccess()) {
903                 promise.setFailure(future.cause());
904             }
905         }
906 
907         @Override
908         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
909             assert ch.eventLoop().inEventLoop();
910             if (len == 0) {
911                 promise.setSuccess();
912                 return true;
913             }
914             try {
915                 // We create the pipe on the target channel as this will allow us to just handle pending writes
916                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
917                 // on multiple Channels pointing to one target Channel.
918                 FileDescriptor pipeOut = ch.pipeOut;
919                 if (pipeOut == null) {
920                     // Create a new pipe as non was created before.
921                     FileDescriptor[] pipe = pipe();
922                     ch.pipeIn = pipe[0];
923                     pipeOut = ch.pipeOut = pipe[1];
924                 }
925 
926                 int splicedIn = spliceIn(pipeOut, handle);
927                 if (splicedIn > 0) {
928                     // Integer.MAX_VALUE is a special value which will result in splice forever.
929                     if (len != Integer.MAX_VALUE) {
930                         len -= splicedIn;
931                     }
932 
933                     // Depending on if we are done with splicing inbound data we set the right promise for the
934                     // outbound splicing.
935                     final ChannelPromise splicePromise;
936                     if (len == 0) {
937                         splicePromise = promise;
938                     } else {
939                         splicePromise = ch.newPromise().addListener(this);
940                     }
941 
942                     boolean autoRead = config().isAutoRead();
943 
944                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
945                     // case.
946                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
947                     ch.unsafe().flush();
948                     if (autoRead && !splicePromise.isDone()) {
949                         // Write was not done which means the target channel was not writable. In this case we need to
950                         // disable reading until we are done with splicing to the target channel because:
951                         //
952                         // - The user may want to to trigger another splice operation once the splicing was complete.
953                         config().setAutoRead(false);
954                     }
955                 }
956 
957                 return len == 0;
958             } catch (Throwable cause) {
959                 promise.setFailure(cause);
960                 return true;
961             }
962         }
963     }
964 
965     private final class SpliceOutTask {
966         private final AbstractEpollStreamChannel ch;
967         private final boolean autoRead;
968         private int len;
969 
970         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
971             this.ch = ch;
972             this.len = len;
973             this.autoRead = autoRead;
974         }
975 
976         public boolean spliceOut() throws Exception {
977             assert ch.eventLoop().inEventLoop();
978             try {
979                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
980                 len -= splicedOut;
981                 if (len == 0) {
982                     if (autoRead) {
983                         // AutoRead was used and we spliced everything so start reading again
984                         config().setAutoRead(true);
985                     }
986                     return true;
987                 }
988                 return false;
989             } catch (IOException e) {
990                 if (autoRead) {
991                     // AutoRead was used and we spliced everything so start reading again
992                     config().setAutoRead(true);
993                 }
994                 throw e;
995             }
996         }
997     }
998 
999     private final class SpliceFdTask extends SpliceInTask {
1000         private final FileDescriptor fd;
1001         private final ChannelPromise promise;
1002         private final int offset;
1003 
1004         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1005             super(len, promise);
1006             this.fd = fd;
1007             this.promise = promise;
1008             this.offset = offset;
1009         }
1010 
1011         @Override
1012         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1013             assert eventLoop().inEventLoop();
1014             if (len == 0) {
1015                 promise.setSuccess();
1016                 return true;
1017             }
1018 
1019             try {
1020                 FileDescriptor[] pipe = pipe();
1021                 FileDescriptor pipeIn = pipe[0];
1022                 FileDescriptor pipeOut = pipe[1];
1023                 try {
1024                     int splicedIn = spliceIn(pipeOut, handle);
1025                     if (splicedIn > 0) {
1026                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1027                         if (len != Integer.MAX_VALUE) {
1028                             len -= splicedIn;
1029                         }
1030                         do {
1031                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1032                             splicedIn -= splicedOut;
1033                         } while (splicedIn > 0);
1034                         if (len == 0) {
1035                             promise.setSuccess();
1036                             return true;
1037                         }
1038                     }
1039                     return false;
1040                 } finally {
1041                     safeClosePipe(pipeIn);
1042                     safeClosePipe(pipeOut);
1043                 }
1044             } catch (Throwable cause) {
1045                 promise.setFailure(cause);
1046                 return true;
1047             }
1048         }
1049     }
1050 
1051     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1052         EpollSocketWritableByteChannel() {
1053             super(socket);
1054         }
1055 
1056         @Override
1057         protected ByteBufAllocator alloc() {
1058             return AbstractEpollStreamChannel.this.alloc();
1059         }
1060     }
1061 }