View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.ThrowableUtil;
41  import io.netty.util.internal.UnstableApi;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.ClosedChannelException;
49  import java.nio.channels.WritableByteChannel;
50  import java.util.Queue;
51  import java.util.concurrent.Executor;
52  
53  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.FileDescriptor.pipe;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  
58  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60      private static final String EXPECTED_TYPES =
61              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64      private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
65              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
66                      AbstractEpollStreamChannel.class, "clearSpliceQueue()");
67      private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
68              new ClosedChannelException(),
69              AbstractEpollStreamChannel.class, "spliceTo(...)");
70      private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
71              ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
72              AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
73      private final Runnable flushTask = new Runnable() {
74          @Override
75          public void run() {
76              flush();
77          }
78      };
79      private Queue<SpliceInTask> spliceQueue;
80  
81      // Lazy init these if we need to splice(...)
82      private FileDescriptor pipeIn;
83      private FileDescriptor pipeOut;
84  
85      private WritableByteChannel byteChannel;
86  
87      protected AbstractEpollStreamChannel(Channel parent, int fd) {
88          this(parent, new LinuxSocket(fd));
89      }
90  
91      protected AbstractEpollStreamChannel(int fd) {
92          this(new LinuxSocket(fd));
93      }
94  
95      AbstractEpollStreamChannel(LinuxSocket fd) {
96          this(fd, isSoErrorZero(fd));
97      }
98  
99      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
100         super(parent, fd, Native.EPOLLIN, true);
101         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
102         flags |= Native.EPOLLRDHUP;
103     }
104 
105     AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
106         super(parent, fd, Native.EPOLLIN, remote);
107         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
108         flags |= Native.EPOLLRDHUP;
109     }
110 
111     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
112         super(null, fd, Native.EPOLLIN, active);
113         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
114         flags |= Native.EPOLLRDHUP;
115     }
116 
117     @Override
118     protected AbstractEpollUnsafe newUnsafe() {
119         return new EpollStreamUnsafe();
120     }
121 
122     @Override
123     public ChannelMetadata metadata() {
124         return METADATA;
125     }
126 
127     /**
128      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
129      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
130      * splice until the {@link ChannelFuture} was canceled or it was failed.
131      *
132      * Please note:
133      * <ul>
134      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
135      *   {@link IllegalArgumentException} is thrown. </li>
136      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
137      *   target {@link AbstractEpollStreamChannel}</li>
138      * </ul>
139      *
140      */
141     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
142         return spliceTo(ch, len, newPromise());
143     }
144 
145     /**
146      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
147      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
148      * splice until the {@link ChannelFuture} was canceled or it was failed.
149      *
150      * Please note:
151      * <ul>
152      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
153      *   {@link IllegalArgumentException} is thrown. </li>
154      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
155      *   target {@link AbstractEpollStreamChannel}</li>
156      * </ul>
157      *
158      */
159     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
160                                         final ChannelPromise promise) {
161         if (ch.eventLoop() != eventLoop()) {
162             throw new IllegalArgumentException("EventLoops are not the same.");
163         }
164         if (len < 0) {
165             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
166         }
167         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
168                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
169             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
170         }
171         checkNotNull(promise, "promise");
172         if (!isOpen()) {
173             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
174         } else {
175             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
176             failSpliceIfClosed(promise);
177         }
178         return promise;
179     }
180 
181     /**
182      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
183      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
184      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
185      * {@link ChannelFuture} was canceled or it was failed.
186      *
187      * Please note:
188      * <ul>
189      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
190      *   {@link AbstractEpollStreamChannel}</li>
191      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
192      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
193      * </ul>
194      */
195     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
196         return spliceTo(ch, offset, len, newPromise());
197     }
198 
199     /**
200      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
201      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
202      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
203      * {@link ChannelFuture} was canceled or it was failed.
204      *
205      * Please note:
206      * <ul>
207      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
208      *   {@link AbstractEpollStreamChannel}</li>
209      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
210      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
211      * </ul>
212      */
213     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
214                                         final ChannelPromise promise) {
215         if (len < 0) {
216             throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
217         }
218         if (offset < 0) {
219             throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
220         }
221         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
222             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
223         }
224         checkNotNull(promise, "promise");
225         if (!isOpen()) {
226             promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
227         } else {
228             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
229             failSpliceIfClosed(promise);
230         }
231         return promise;
232     }
233 
234     private void failSpliceIfClosed(ChannelPromise promise) {
235         if (!isOpen()) {
236             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
237             // cases where a future may not be notified otherwise.
238             if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
239                 eventLoop().execute(new Runnable() {
240                     @Override
241                     public void run() {
242                         // Call this via the EventLoop as it is a MPSC queue.
243                         clearSpliceQueue();
244                     }
245                 });
246             }
247         }
248     }
249 
250     /**
251      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
252      * @param in the collection which contains objects to write.
253      * @param buf the {@link ByteBuf} from which the bytes should be written
254      * @return The value that should be decremented from the write quantum which starts at
255      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
256      * <ul>
257      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
258      *     is encountered</li>
259      *     <li>1 - if a single call to write data was made to the OS</li>
260      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
261      *     no data was accepted</li>
262      * </ul>
263      */
264     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
265         int readableBytes = buf.readableBytes();
266         if (readableBytes == 0) {
267             in.remove();
268             return 0;
269         }
270 
271         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
272             return doWriteBytes(in, buf);
273         } else {
274             ByteBuffer[] nioBuffers = buf.nioBuffers();
275             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
276                     config().getMaxBytesPerGatheringWrite());
277         }
278     }
279 
280     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
281         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
282         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
283         // make a best effort to adjust as OS behavior changes.
284         if (attempted == written) {
285             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
286                 config().setMaxBytesPerGatheringWrite(attempted << 1);
287             }
288         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
289             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
290         }
291     }
292 
293     /**
294      * Write multiple bytes via {@link IovArray}.
295      * @param in the collection which contains objects to write.
296      * @param array The array which contains the content to write.
297      * @return The value that should be decremented from the write quantum which starts at
298      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
299      * <ul>
300      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
301      *     is encountered</li>
302      *     <li>1 - if a single call to write data was made to the OS</li>
303      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
304      *     no data was accepted</li>
305      * </ul>
306      * @throws IOException If an I/O exception occurs during write.
307      */
308     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
309         final long expectedWrittenBytes = array.size();
310         assert expectedWrittenBytes != 0;
311         final int cnt = array.count();
312         assert cnt != 0;
313 
314         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
315         if (localWrittenBytes > 0) {
316             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
317             in.removeBytes(localWrittenBytes);
318             return 1;
319         }
320         return WRITE_STATUS_SNDBUF_FULL;
321     }
322 
323     /**
324      * Write multiple bytes via {@link ByteBuffer} array.
325      * @param in the collection which contains objects to write.
326      * @param nioBuffers The buffers to write.
327      * @param nioBufferCnt The number of buffers to write.
328      * @param expectedWrittenBytes The number of bytes we expect to write.
329      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
330      * @return The value that should be decremented from the write quantum which starts at
331      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
332      * <ul>
333      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
334      *     is encountered</li>
335      *     <li>1 - if a single call to write data was made to the OS</li>
336      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
337      *     no data was accepted</li>
338      * </ul>
339      * @throws IOException If an I/O exception occurs during write.
340      */
341     private int writeBytesMultiple(
342             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
343             long maxBytesPerGatheringWrite) throws IOException {
344         assert expectedWrittenBytes != 0;
345         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
346             expectedWrittenBytes = maxBytesPerGatheringWrite;
347         }
348 
349         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
350         if (localWrittenBytes > 0) {
351             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
352             in.removeBytes(localWrittenBytes);
353             return 1;
354         }
355         return WRITE_STATUS_SNDBUF_FULL;
356     }
357 
358     /**
359      * Write a {@link DefaultFileRegion}
360      * @param in the collection which contains objects to write.
361      * @param region the {@link DefaultFileRegion} from which the bytes should be written
362      * @return The value that should be decremented from the write quantum which starts at
363      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
364      * <ul>
365      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
366      *     is encountered</li>
367      *     <li>1 - if a single call to write data was made to the OS</li>
368      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
369      *     no data was accepted</li>
370      * </ul>
371      */
372     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
373         final long regionCount = region.count();
374         if (region.transferred() >= regionCount) {
375             in.remove();
376             return 0;
377         }
378 
379         final long offset = region.transferred();
380         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
381         if (flushedAmount > 0) {
382             in.progress(flushedAmount);
383             if (region.transferred() >= regionCount) {
384                 in.remove();
385             }
386             return 1;
387         }
388         return WRITE_STATUS_SNDBUF_FULL;
389     }
390 
391     /**
392      * Write a {@link FileRegion}
393      * @param in the collection which contains objects to write.
394      * @param region the {@link FileRegion} from which the bytes should be written
395      * @return The value that should be decremented from the write quantum which starts at
396      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
397      * <ul>
398      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
399      *     is encountered</li>
400      *     <li>1 - if a single call to write data was made to the OS</li>
401      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
402      *     no data was accepted</li>
403      * </ul>
404      */
405     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
406         if (region.transferred() >= region.count()) {
407             in.remove();
408             return 0;
409         }
410 
411         if (byteChannel == null) {
412             byteChannel = new EpollSocketWritableByteChannel();
413         }
414         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
415         if (flushedAmount > 0) {
416             in.progress(flushedAmount);
417             if (region.transferred() >= region.count()) {
418                 in.remove();
419             }
420             return 1;
421         }
422         return WRITE_STATUS_SNDBUF_FULL;
423     }
424 
425     @Override
426     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
427         int writeSpinCount = config().getWriteSpinCount();
428         do {
429             final int msgCount = in.size();
430             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
431             if (msgCount > 1 && in.current() instanceof ByteBuf) {
432                 writeSpinCount -= doWriteMultiple(in);
433             } else if (msgCount == 0) {
434                 // Wrote all messages.
435                 clearFlag(Native.EPOLLOUT);
436                 // Return here so we not set the EPOLLOUT flag.
437                 return;
438             } else {  // msgCount == 1
439                 writeSpinCount -= doWriteSingle(in);
440             }
441 
442             // We do not break the loop here even if the outbound buffer was flushed completely,
443             // because a user might have triggered another write and flush when we notify his or her
444             // listeners.
445         } while (writeSpinCount > 0);
446 
447         if (writeSpinCount == 0) {
448             // We used our writeSpin quantum, and should try to write again later.
449             eventLoop().execute(flushTask);
450         } else {
451             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
452             // when it can accept more data.
453             setFlag(Native.EPOLLOUT);
454         }
455     }
456 
457     /**
458      * Attempt to write a single object.
459      * @param in the collection which contains objects to write.
460      * @return The value that should be decremented from the write quantum which starts at
461      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
462      * <ul>
463      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
464      *     is encountered</li>
465      *     <li>1 - if a single call to write data was made to the OS</li>
466      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
467      *     no data was accepted</li>
468      * </ul>
469      * @throws Exception If an I/O error occurs.
470      */
471     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
472         // The outbound buffer contains only one message or it contains a file region.
473         Object msg = in.current();
474         if (msg instanceof ByteBuf) {
475             return writeBytes(in, (ByteBuf) msg);
476         } else if (msg instanceof DefaultFileRegion) {
477             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
478         } else if (msg instanceof FileRegion) {
479             return writeFileRegion(in, (FileRegion) msg);
480         } else if (msg instanceof SpliceOutTask) {
481             if (!((SpliceOutTask) msg).spliceOut()) {
482                 return WRITE_STATUS_SNDBUF_FULL;
483             }
484             in.remove();
485             return 1;
486         } else {
487             // Should never reach here.
488             throw new Error();
489         }
490     }
491 
492     /**
493      * Attempt to write multiple {@link ByteBuf} objects.
494      * @param in the collection which contains objects to write.
495      * @return The value that should be decremented from the write quantum which starts at
496      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
497      * <ul>
498      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
499      *     is encountered</li>
500      *     <li>1 - if a single call to write data was made to the OS</li>
501      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
502      *     no data was accepted</li>
503      * </ul>
504      * @throws Exception If an I/O error occurs.
505      */
506     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
507         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
508         if (PlatformDependent.hasUnsafe()) {
509             IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
510             array.maxBytes(maxBytesPerGatheringWrite);
511             in.forEachFlushedMessage(array);
512 
513             if (array.count() >= 1) {
514                 // TODO: Handle the case where cnt == 1 specially.
515                 return writeBytesMultiple(in, array);
516             }
517         } else {
518             ByteBuffer[] buffers = in.nioBuffers();
519             int cnt = in.nioBufferCount();
520             if (cnt >= 1) {
521                 // TODO: Handle the case where cnt == 1 specially.
522                 return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
523             }
524         }
525         // cnt == 0, which means the outbound buffer contained empty buffers only.
526         in.removeBytes(0);
527         return 0;
528     }
529 
530     @Override
531     protected Object filterOutboundMessage(Object msg) {
532         if (msg instanceof ByteBuf) {
533             ByteBuf buf = (ByteBuf) msg;
534             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
535         }
536 
537         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
538             return msg;
539         }
540 
541         throw new UnsupportedOperationException(
542                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
543     }
544 
545     @UnstableApi
546     @Override
547     protected final void doShutdownOutput() throws Exception {
548         socket.shutdown(false, true);
549     }
550 
551     private void shutdownInput0(final ChannelPromise promise) {
552         try {
553             socket.shutdown(true, false);
554             promise.setSuccess();
555         } catch (Throwable cause) {
556             promise.setFailure(cause);
557         }
558     }
559 
560     @Override
561     public boolean isOutputShutdown() {
562         return socket.isOutputShutdown();
563     }
564 
565     @Override
566     public boolean isInputShutdown() {
567         return socket.isInputShutdown();
568     }
569 
570     @Override
571     public boolean isShutdown() {
572         return socket.isShutdown();
573     }
574 
575     @Override
576     public ChannelFuture shutdownOutput() {
577         return shutdownOutput(newPromise());
578     }
579 
580     @Override
581     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
582         EventLoop loop = eventLoop();
583         if (loop.inEventLoop()) {
584             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
585         } else {
586             loop.execute(new Runnable() {
587                 @Override
588                 public void run() {
589                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
590                 }
591             });
592         }
593 
594         return promise;
595     }
596 
597     @Override
598     public ChannelFuture shutdownInput() {
599         return shutdownInput(newPromise());
600     }
601 
602     @Override
603     public ChannelFuture shutdownInput(final ChannelPromise promise) {
604         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
605         if (closeExecutor != null) {
606             closeExecutor.execute(new Runnable() {
607                 @Override
608                 public void run() {
609                     shutdownInput0(promise);
610                 }
611             });
612         } else {
613             EventLoop loop = eventLoop();
614             if (loop.inEventLoop()) {
615                 shutdownInput0(promise);
616             } else {
617                 loop.execute(new Runnable() {
618                     @Override
619                     public void run() {
620                         shutdownInput0(promise);
621                     }
622                 });
623             }
624         }
625         return promise;
626     }
627 
628     @Override
629     public ChannelFuture shutdown() {
630         return shutdown(newPromise());
631     }
632 
633     @Override
634     public ChannelFuture shutdown(final ChannelPromise promise) {
635         ChannelFuture shutdownOutputFuture = shutdownOutput();
636         if (shutdownOutputFuture.isDone()) {
637             shutdownOutputDone(shutdownOutputFuture, promise);
638         } else {
639             shutdownOutputFuture.addListener(new ChannelFutureListener() {
640                 @Override
641                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
642                     shutdownOutputDone(shutdownOutputFuture, promise);
643                 }
644             });
645         }
646         return promise;
647     }
648 
649     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
650         ChannelFuture shutdownInputFuture = shutdownInput();
651         if (shutdownInputFuture.isDone()) {
652             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
653         } else {
654             shutdownInputFuture.addListener(new ChannelFutureListener() {
655                 @Override
656                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
657                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
658                 }
659             });
660         }
661     }
662 
663     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
664                               ChannelFuture shutdownInputFuture,
665                               ChannelPromise promise) {
666         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
667         Throwable shutdownInputCause = shutdownInputFuture.cause();
668         if (shutdownOutputCause != null) {
669             if (shutdownInputCause != null) {
670                 logger.debug("Exception suppressed because a previous exception occurred.",
671                         shutdownInputCause);
672             }
673             promise.setFailure(shutdownOutputCause);
674         } else if (shutdownInputCause != null) {
675             promise.setFailure(shutdownInputCause);
676         } else {
677             promise.setSuccess();
678         }
679     }
680 
681     @Override
682     protected void doClose() throws Exception {
683         try {
684             // Calling super.doClose() first so spliceTo(...) will fail on next call.
685             super.doClose();
686         } finally {
687             safeClosePipe(pipeIn);
688             safeClosePipe(pipeOut);
689             clearSpliceQueue();
690         }
691     }
692 
693     private void clearSpliceQueue() {
694         if (spliceQueue == null) {
695             return;
696         }
697         for (;;) {
698             SpliceInTask task = spliceQueue.poll();
699             if (task == null) {
700                 break;
701             }
702             task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
703         }
704     }
705 
706     private static void safeClosePipe(FileDescriptor fd) {
707         if (fd != null) {
708             try {
709                 fd.close();
710             } catch (IOException e) {
711                 if (logger.isWarnEnabled()) {
712                     logger.warn("Error while closing a pipe", e);
713                 }
714             }
715         }
716     }
717 
718     class EpollStreamUnsafe extends AbstractEpollUnsafe {
719         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
720         @Override
721         protected Executor prepareToClose() {
722             return super.prepareToClose();
723         }
724 
725         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
726                 EpollRecvByteAllocatorHandle allocHandle) {
727             if (byteBuf != null) {
728                 if (byteBuf.isReadable()) {
729                     readPending = false;
730                     pipeline.fireChannelRead(byteBuf);
731                 } else {
732                     byteBuf.release();
733                 }
734             }
735             allocHandle.readComplete();
736             pipeline.fireChannelReadComplete();
737             pipeline.fireExceptionCaught(cause);
738             if (close || cause instanceof IOException) {
739                 shutdownInput(false);
740             }
741         }
742 
743         @Override
744         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
745             return new EpollRecvByteAllocatorStreamingHandle(handle);
746         }
747 
748         @Override
749         void epollInReady() {
750             final ChannelConfig config = config();
751             if (shouldBreakEpollInReady(config)) {
752                 clearEpollIn0();
753                 return;
754             }
755             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
756             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
757 
758             final ChannelPipeline pipeline = pipeline();
759             final ByteBufAllocator allocator = config.getAllocator();
760             allocHandle.reset(config);
761             epollInBefore();
762 
763             ByteBuf byteBuf = null;
764             boolean close = false;
765             try {
766                 do {
767                     if (spliceQueue != null) {
768                         SpliceInTask spliceTask = spliceQueue.peek();
769                         if (spliceTask != null) {
770                             if (spliceTask.spliceIn(allocHandle)) {
771                                 // We need to check if it is still active as if not we removed all SpliceTasks in
772                                 // doClose(...)
773                                 if (isActive()) {
774                                     spliceQueue.remove();
775                                 }
776                                 continue;
777                             } else {
778                                 break;
779                             }
780                         }
781                     }
782 
783                     // we use a direct buffer here as the native implementations only be able
784                     // to handle direct buffers.
785                     byteBuf = allocHandle.allocate(allocator);
786                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
787                     if (allocHandle.lastBytesRead() <= 0) {
788                         // nothing was read, release the buffer.
789                         byteBuf.release();
790                         byteBuf = null;
791                         close = allocHandle.lastBytesRead() < 0;
792                         if (close) {
793                             // There is nothing left to read as we received an EOF.
794                             readPending = false;
795                         }
796                         break;
797                     }
798                     allocHandle.incMessagesRead(1);
799                     readPending = false;
800                     pipeline.fireChannelRead(byteBuf);
801                     byteBuf = null;
802 
803                     if (shouldBreakEpollInReady(config)) {
804                         // We need to do this for two reasons:
805                         //
806                         // - If the input was shutdown in between (which may be the case when the user did it in the
807                         //   fireChannelRead(...) method we should not try to read again to not produce any
808                         //   miss-leading exceptions.
809                         //
810                         // - If the user closes the channel we need to ensure we not try to read from it again as
811                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
812                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
813                         //   reading data from a filedescriptor that belongs to another socket then the socket that
814                         //   was "wrapped" by this Channel implementation.
815                         break;
816                     }
817                 } while (allocHandle.continueReading());
818 
819                 allocHandle.readComplete();
820                 pipeline.fireChannelReadComplete();
821 
822                 if (close) {
823                     shutdownInput(false);
824                 }
825             } catch (Throwable t) {
826                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
827             } finally {
828                 epollInFinally(config);
829             }
830         }
831     }
832 
833     private void addToSpliceQueue(final SpliceInTask task) {
834         EventLoop eventLoop = eventLoop();
835         if (eventLoop.inEventLoop()) {
836             addToSpliceQueue0(task);
837         } else {
838             eventLoop.execute(new Runnable() {
839                 @Override
840                 public void run() {
841                     addToSpliceQueue0(task);
842                 }
843             });
844         }
845     }
846 
847     private void addToSpliceQueue0(SpliceInTask task) {
848         if (spliceQueue == null) {
849             spliceQueue = PlatformDependent.newMpscQueue();
850         }
851         spliceQueue.add(task);
852     }
853 
854     protected abstract class SpliceInTask {
855         final ChannelPromise promise;
856         int len;
857 
858         protected SpliceInTask(int len, ChannelPromise promise) {
859             this.promise = promise;
860             this.len = len;
861         }
862 
863         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
864 
865         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
866             // calculate the maximum amount of data we are allowed to splice
867             int length = Math.min(handle.guess(), len);
868             int splicedIn = 0;
869             for (;;) {
870                 // Splicing until there is nothing left to splice.
871                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
872                 if (localSplicedIn == 0) {
873                     break;
874                 }
875                 splicedIn += localSplicedIn;
876                 length -= localSplicedIn;
877             }
878 
879             return splicedIn;
880         }
881     }
882 
883     // Let it directly implement channelFutureListener as well to reduce object creation.
884     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
885         private final AbstractEpollStreamChannel ch;
886 
887         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
888             super(len, promise);
889             this.ch = ch;
890         }
891 
892         @Override
893         public void operationComplete(ChannelFuture future) throws Exception {
894             if (!future.isSuccess()) {
895                 promise.setFailure(future.cause());
896             }
897         }
898 
899         @Override
900         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
901             assert ch.eventLoop().inEventLoop();
902             if (len == 0) {
903                 promise.setSuccess();
904                 return true;
905             }
906             try {
907                 // We create the pipe on the target channel as this will allow us to just handle pending writes
908                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
909                 // on multiple Channels pointing to one target Channel.
910                 FileDescriptor pipeOut = ch.pipeOut;
911                 if (pipeOut == null) {
912                     // Create a new pipe as non was created before.
913                     FileDescriptor[] pipe = pipe();
914                     ch.pipeIn = pipe[0];
915                     pipeOut = ch.pipeOut = pipe[1];
916                 }
917 
918                 int splicedIn = spliceIn(pipeOut, handle);
919                 if (splicedIn > 0) {
920                     // Integer.MAX_VALUE is a special value which will result in splice forever.
921                     if (len != Integer.MAX_VALUE) {
922                         len -= splicedIn;
923                     }
924 
925                     // Depending on if we are done with splicing inbound data we set the right promise for the
926                     // outbound splicing.
927                     final ChannelPromise splicePromise;
928                     if (len == 0) {
929                         splicePromise = promise;
930                     } else {
931                         splicePromise = ch.newPromise().addListener(this);
932                     }
933 
934                     boolean autoRead = config().isAutoRead();
935 
936                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
937                     // case.
938                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
939                     ch.unsafe().flush();
940                     if (autoRead && !splicePromise.isDone()) {
941                         // Write was not done which means the target channel was not writable. In this case we need to
942                         // disable reading until we are done with splicing to the target channel because:
943                         //
944                         // - The user may want to to trigger another splice operation once the splicing was complete.
945                         config().setAutoRead(false);
946                     }
947                 }
948 
949                 return len == 0;
950             } catch (Throwable cause) {
951                 promise.setFailure(cause);
952                 return true;
953             }
954         }
955     }
956 
957     private final class SpliceOutTask {
958         private final AbstractEpollStreamChannel ch;
959         private final boolean autoRead;
960         private int len;
961 
962         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
963             this.ch = ch;
964             this.len = len;
965             this.autoRead = autoRead;
966         }
967 
968         public boolean spliceOut() throws Exception {
969             assert ch.eventLoop().inEventLoop();
970             try {
971                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
972                 len -= splicedOut;
973                 if (len == 0) {
974                     if (autoRead) {
975                         // AutoRead was used and we spliced everything so start reading again
976                         config().setAutoRead(true);
977                     }
978                     return true;
979                 }
980                 return false;
981             } catch (IOException e) {
982                 if (autoRead) {
983                     // AutoRead was used and we spliced everything so start reading again
984                     config().setAutoRead(true);
985                 }
986                 throw e;
987             }
988         }
989     }
990 
991     private final class SpliceFdTask extends SpliceInTask {
992         private final FileDescriptor fd;
993         private final ChannelPromise promise;
994         private final int offset;
995 
996         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
997             super(len, promise);
998             this.fd = fd;
999             this.promise = promise;
1000             this.offset = offset;
1001         }
1002 
1003         @Override
1004         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1005             assert eventLoop().inEventLoop();
1006             if (len == 0) {
1007                 promise.setSuccess();
1008                 return true;
1009             }
1010 
1011             try {
1012                 FileDescriptor[] pipe = pipe();
1013                 FileDescriptor pipeIn = pipe[0];
1014                 FileDescriptor pipeOut = pipe[1];
1015                 try {
1016                     int splicedIn = spliceIn(pipeOut, handle);
1017                     if (splicedIn > 0) {
1018                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1019                         if (len != Integer.MAX_VALUE) {
1020                             len -= splicedIn;
1021                         }
1022                         do {
1023                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1024                             splicedIn -= splicedOut;
1025                         } while (splicedIn > 0);
1026                         if (len == 0) {
1027                             promise.setSuccess();
1028                             return true;
1029                         }
1030                     }
1031                     return false;
1032                 } finally {
1033                     safeClosePipe(pipeIn);
1034                     safeClosePipe(pipeOut);
1035                 }
1036             } catch (Throwable cause) {
1037                 promise.setFailure(cause);
1038                 return true;
1039             }
1040         }
1041     }
1042 
1043     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1044         EpollSocketWritableByteChannel() {
1045             super(socket);
1046         }
1047 
1048         @Override
1049         protected ByteBufAllocator alloc() {
1050             return AbstractEpollStreamChannel.this.alloc();
1051         }
1052     }
1053 }