View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.internal.ChannelUtils;
32  import io.netty.channel.socket.DuplexChannel;
33  import io.netty.channel.unix.IovArray;
34  import io.netty.channel.unix.SocketWritableByteChannel;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.internal.StringUtil;
37  import io.netty.util.internal.UnstableApi;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.net.SocketAddress;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.WritableByteChannel;
45  import java.util.concurrent.Executor;
46  
47  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
48  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
49  
50  public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
52      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
53      private static final String EXPECTED_TYPES =
54              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
55                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
56      private WritableByteChannel byteChannel;
57      private final Runnable flushTask = new Runnable() {
58          @Override
59          public void run() {
60              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
61              // meantime.
62              ((AbstractKQueueUnsafe) unsafe()).flush0();
63          }
64      };
65  
66      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
67          super(parent, fd, active);
68      }
69  
70      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
71          super(parent, fd, remote);
72      }
73  
74      AbstractKQueueStreamChannel(BsdSocket fd) {
75          this(null, fd, isSoErrorZero(fd));
76      }
77  
78      @Override
79      protected AbstractKQueueUnsafe newUnsafe() {
80          return new KQueueStreamUnsafe();
81      }
82  
83      @Override
84      public ChannelMetadata metadata() {
85          return METADATA;
86      }
87  
88      /**
89       * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
90       * @param in the collection which contains objects to write.
91       * @param buf the {@link ByteBuf} from which the bytes should be written
92       * @return The value that should be decremented from the write quantum which starts at
93       * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
94       * <ul>
95       *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
96       *     is encountered</li>
97       *     <li>1 - if a single call to write data was made to the OS</li>
98       *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
99       *     data was accepted</li>
100      * </ul>
101      */
102     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
103         int readableBytes = buf.readableBytes();
104         if (readableBytes == 0) {
105             in.remove();
106             return 0;
107         }
108 
109         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
110             return doWriteBytes(in, buf);
111         } else {
112             ByteBuffer[] nioBuffers = buf.nioBuffers();
113             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
114                     config().getMaxBytesPerGatheringWrite());
115         }
116     }
117 
118     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
119         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
120         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
121         // make a best effort to adjust as OS behavior changes.
122         if (attempted == written) {
123             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
124                 config().setMaxBytesPerGatheringWrite(attempted << 1);
125             }
126         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
127             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
128         }
129     }
130 
131     /**
132      * Write multiple bytes via {@link IovArray}.
133      * @param in the collection which contains objects to write.
134      * @param array The array which contains the content to write.
135      * @return The value that should be decremented from the write quantum which starts at
136      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
137      * <ul>
138      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
139      *     is encountered</li>
140      *     <li>1 - if a single call to write data was made to the OS</li>
141      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
142      *     no data was accepted</li>
143      * </ul>
144      * @throws IOException If an I/O exception occurs during write.
145      */
146     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
147         final long expectedWrittenBytes = array.size();
148         assert expectedWrittenBytes != 0;
149         final int cnt = array.count();
150         assert cnt != 0;
151 
152         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
153         if (localWrittenBytes > 0) {
154             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
155             in.removeBytes(localWrittenBytes);
156             return 1;
157         }
158         return WRITE_STATUS_SNDBUF_FULL;
159     }
160 
161     /**
162      * Write multiple bytes via {@link ByteBuffer} array.
163      * @param in the collection which contains objects to write.
164      * @param nioBuffers The buffers to write.
165      * @param nioBufferCnt The number of buffers to write.
166      * @param expectedWrittenBytes The number of bytes we expect to write.
167      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
168      * @return The value that should be decremented from the write quantum which starts at
169      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
170      * <ul>
171      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
172      *     is encountered</li>
173      *     <li>1 - if a single call to write data was made to the OS</li>
174      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
175      *     no data was accepted</li>
176      * </ul>
177      * @throws IOException If an I/O exception occurs during write.
178      */
179     private int writeBytesMultiple(
180             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
181             long maxBytesPerGatheringWrite) throws IOException {
182         assert expectedWrittenBytes != 0;
183         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
184             expectedWrittenBytes = maxBytesPerGatheringWrite;
185         }
186 
187         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
188         if (localWrittenBytes > 0) {
189             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
190             in.removeBytes(localWrittenBytes);
191             return 1;
192         }
193         return WRITE_STATUS_SNDBUF_FULL;
194     }
195 
196     /**
197      * Write a {@link DefaultFileRegion}
198      * @param in the collection which contains objects to write.
199      * @param region the {@link DefaultFileRegion} from which the bytes should be written
200      * @return The value that should be decremented from the write quantum which starts at
201      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
202      * <ul>
203      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
204      *     is encountered</li>
205      *     <li>1 - if a single call to write data was made to the OS</li>
206      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
207      *     no data was accepted</li>
208      * </ul>
209      */
210     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
211         final long regionCount = region.count();
212         final long offset = region.transferred();
213 
214         if (offset >= regionCount) {
215             in.remove();
216             return 0;
217         }
218 
219         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
220         if (flushedAmount > 0) {
221             in.progress(flushedAmount);
222             if (region.transferred() >= regionCount) {
223                 in.remove();
224             }
225             return 1;
226         } else if (flushedAmount == 0) {
227             validateFileRegion(region, offset);
228         }
229         return WRITE_STATUS_SNDBUF_FULL;
230     }
231 
232     /**
233      * Write a {@link FileRegion}
234      * @param in the collection which contains objects to write.
235      * @param region the {@link FileRegion} from which the bytes should be written
236      * @return The value that should be decremented from the write quantum which starts at
237      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
238      * <ul>
239      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
240      *     is encountered</li>
241      *     <li>1 - if a single call to write data was made to the OS</li>
242      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
243      *     data was accepted</li>
244      * </ul>
245      */
246     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
247         if (region.transferred() >= region.count()) {
248             in.remove();
249             return 0;
250         }
251 
252         if (byteChannel == null) {
253             byteChannel = new KQueueSocketWritableByteChannel();
254         }
255         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
256         if (flushedAmount > 0) {
257             in.progress(flushedAmount);
258             if (region.transferred() >= region.count()) {
259                 in.remove();
260             }
261             return 1;
262         }
263         return WRITE_STATUS_SNDBUF_FULL;
264     }
265 
266     @Override
267     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
268         int writeSpinCount = config().getWriteSpinCount();
269         do {
270             final int msgCount = in.size();
271             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
272             if (msgCount > 1 && in.current() instanceof ByteBuf) {
273                 writeSpinCount -= doWriteMultiple(in);
274             } else if (msgCount == 0) {
275                 // Wrote all messages.
276                 writeFilter(false);
277                 // Return here so we don't set the WRITE flag.
278                 return;
279             } else { // msgCount == 1
280                 writeSpinCount -= doWriteSingle(in);
281             }
282 
283             // We do not break the loop here even if the outbound buffer was flushed completely,
284             // because a user might have triggered another write and flush when we notify his or her
285             // listeners.
286         } while (writeSpinCount > 0);
287 
288         if (writeSpinCount == 0) {
289             // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
290             // then use our write quantum. In this case we no longer want to set the write filter because the socket is
291             // still writable (as far as we know). We will find out next time we attempt to write if the socket is
292             // writable and set the write filter if necessary.
293             writeFilter(false);
294 
295             // We used our writeSpin quantum, and should try to write again later.
296             eventLoop().execute(flushTask);
297         } else {
298             // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
299             // when it can accept more data.
300             writeFilter(true);
301         }
302     }
303 
304     /**
305      * Attempt to write a single object.
306      * @param in the collection which contains objects to write.
307      * @return The value that should be decremented from the write quantum which starts at
308      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
309      * <ul>
310      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
311      *     is encountered</li>
312      *     <li>1 - if a single call to write data was made to the OS</li>
313      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
314      *     data was accepted</li>
315      * </ul>
316      * @throws Exception If an I/O error occurs.
317      */
318     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
319         // The outbound buffer contains only one message or it contains a file region.
320         Object msg = in.current();
321         if (msg instanceof ByteBuf) {
322             return writeBytes(in, (ByteBuf) msg);
323         } else if (msg instanceof DefaultFileRegion) {
324             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
325         } else if (msg instanceof FileRegion) {
326             return writeFileRegion(in, (FileRegion) msg);
327         } else {
328             // Should never reach here.
329             throw new Error();
330         }
331     }
332 
333     /**
334      * Attempt to write multiple {@link ByteBuf} objects.
335      * @param in the collection which contains objects to write.
336      * @return The value that should be decremented from the write quantum which starts at
337      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
338      * <ul>
339      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
340      *     is encountered</li>
341      *     <li>1 - if a single call to write data was made to the OS</li>
342      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
343      *     data was accepted</li>
344      * </ul>
345      * @throws Exception If an I/O error occurs.
346      */
347     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
348         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
349         IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
350         array.maxBytes(maxBytesPerGatheringWrite);
351         in.forEachFlushedMessage(array);
352 
353         if (array.count() >= 1) {
354             // TODO: Handle the case where cnt == 1 specially.
355             return writeBytesMultiple(in, array);
356         }
357         // cnt == 0, which means the outbound buffer contained empty buffers only.
358         in.removeBytes(0);
359         return 0;
360     }
361 
362     @Override
363     protected Object filterOutboundMessage(Object msg) {
364         if (msg instanceof ByteBuf) {
365             ByteBuf buf = (ByteBuf) msg;
366             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
367         }
368 
369         if (msg instanceof FileRegion) {
370             return msg;
371         }
372 
373         throw new UnsupportedOperationException(
374                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
375     }
376 
377     @UnstableApi
378     @Override
379     protected final void doShutdownOutput() throws Exception {
380         socket.shutdown(false, true);
381     }
382 
383     @Override
384     public boolean isOutputShutdown() {
385         return socket.isOutputShutdown();
386     }
387 
388     @Override
389     public boolean isInputShutdown() {
390         return socket.isInputShutdown();
391     }
392 
393     @Override
394     public boolean isShutdown() {
395         return socket.isShutdown();
396     }
397 
398     @Override
399     public ChannelFuture shutdownOutput() {
400         return shutdownOutput(newPromise());
401     }
402 
403     @Override
404     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
405         EventLoop loop = eventLoop();
406         if (loop.inEventLoop()) {
407             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
408         } else {
409             loop.execute(new Runnable() {
410                 @Override
411                 public void run() {
412                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
413                 }
414             });
415         }
416         return promise;
417     }
418 
419     @Override
420     public ChannelFuture shutdownInput() {
421         return shutdownInput(newPromise());
422     }
423 
424     @Override
425     public ChannelFuture shutdownInput(final ChannelPromise promise) {
426         EventLoop loop = eventLoop();
427         if (loop.inEventLoop()) {
428             shutdownInput0(promise);
429         } else {
430             loop.execute(new Runnable() {
431                 @Override
432                 public void run() {
433                     shutdownInput0(promise);
434                 }
435             });
436         }
437         return promise;
438     }
439 
440     private void shutdownInput0(ChannelPromise promise) {
441         try {
442             socket.shutdown(true, false);
443         } catch (Throwable cause) {
444             promise.setFailure(cause);
445             return;
446         }
447         promise.setSuccess();
448     }
449 
450     @Override
451     public ChannelFuture shutdown() {
452         return shutdown(newPromise());
453     }
454 
455     @Override
456     public ChannelFuture shutdown(final ChannelPromise promise) {
457         ChannelFuture shutdownOutputFuture = shutdownOutput();
458         if (shutdownOutputFuture.isDone()) {
459             shutdownOutputDone(shutdownOutputFuture, promise);
460         } else {
461             shutdownOutputFuture.addListener(new ChannelFutureListener() {
462                 @Override
463                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
464                     shutdownOutputDone(shutdownOutputFuture, promise);
465                 }
466             });
467         }
468         return promise;
469     }
470 
471     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
472         ChannelFuture shutdownInputFuture = shutdownInput();
473         if (shutdownInputFuture.isDone()) {
474             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
475         } else {
476             shutdownInputFuture.addListener(new ChannelFutureListener() {
477                 @Override
478                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
479                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
480                 }
481             });
482         }
483     }
484 
485     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
486                                      ChannelFuture shutdownInputFuture,
487                                      ChannelPromise promise) {
488         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
489         Throwable shutdownInputCause = shutdownInputFuture.cause();
490         if (shutdownOutputCause != null) {
491             if (shutdownInputCause != null) {
492                 logger.debug("Exception suppressed because a previous exception occurred.",
493                         shutdownInputCause);
494             }
495             promise.setFailure(shutdownOutputCause);
496         } else if (shutdownInputCause != null) {
497             promise.setFailure(shutdownInputCause);
498         } else {
499             promise.setSuccess();
500         }
501     }
502 
503     class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
504         // Overridden here just to be able to access this method from AbstractKQueueStreamChannel
505         @Override
506         protected Executor prepareToClose() {
507             return super.prepareToClose();
508         }
509 
510         @Override
511         void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
512             final ChannelConfig config = config();
513             if (shouldBreakReadReady(config)) {
514                 clearReadFilter0();
515                 return;
516             }
517             final ChannelPipeline pipeline = pipeline();
518             final ByteBufAllocator allocator = config.getAllocator();
519             allocHandle.reset(config);
520             readReadyBefore();
521 
522             ByteBuf byteBuf = null;
523             boolean close = false;
524             try {
525                 do {
526                     // we use a direct buffer here as the native implementations only be able
527                     // to handle direct buffers.
528                     byteBuf = allocHandle.allocate(allocator);
529                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
530                     if (allocHandle.lastBytesRead() <= 0) {
531                         // nothing was read, release the buffer.
532                         byteBuf.release();
533                         byteBuf = null;
534                         close = allocHandle.lastBytesRead() < 0;
535                         if (close) {
536                             // There is nothing left to read as we received an EOF.
537                             readPending = false;
538                         }
539                         break;
540                     }
541                     allocHandle.incMessagesRead(1);
542                     readPending = false;
543                     pipeline.fireChannelRead(byteBuf);
544                     byteBuf = null;
545 
546                     if (shouldBreakReadReady(config)) {
547                         // We need to do this for two reasons:
548                         //
549                         // - If the input was shutdown in between (which may be the case when the user did it in the
550                         //   fireChannelRead(...) method we should not try to read again to not produce any
551                         //   miss-leading exceptions.
552                         //
553                         // - If the user closes the channel we need to ensure we not try to read from it again as
554                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
555                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
556                         //   reading data from a filedescriptor that belongs to another socket then the socket that
557                         //   was "wrapped" by this Channel implementation.
558                         break;
559                     }
560                 } while (allocHandle.continueReading());
561 
562                 allocHandle.readComplete();
563                 pipeline.fireChannelReadComplete();
564 
565                 if (close) {
566                     shutdownInput(false);
567                 }
568             } catch (Throwable t) {
569                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
570             } finally {
571                 readReadyFinally(config);
572             }
573         }
574 
575         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
576                                          KQueueRecvByteAllocatorHandle allocHandle) {
577             if (byteBuf != null) {
578                 if (byteBuf.isReadable()) {
579                     readPending = false;
580                     pipeline.fireChannelRead(byteBuf);
581                 } else {
582                     byteBuf.release();
583                 }
584             }
585             if (!failConnectPromise(cause)) {
586                 allocHandle.readComplete();
587                 pipeline.fireChannelReadComplete();
588                 pipeline.fireExceptionCaught(cause);
589 
590                 // If oom will close the read event, release connection.
591                 // See https://github.com/netty/netty/issues/10434
592                 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
593                     shutdownInput(false);
594                 }
595             }
596         }
597     }
598 
599     private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
600         KQueueSocketWritableByteChannel() {
601             super(socket);
602         }
603 
604         @Override
605         protected ByteBufAllocator alloc() {
606             return AbstractKQueueStreamChannel.this.alloc();
607         }
608     }
609 }