View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.internal.ChannelUtils;
32  import io.netty.channel.socket.DuplexChannel;
33  import io.netty.channel.unix.IovArray;
34  import io.netty.channel.unix.SocketWritableByteChannel;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.internal.StringUtil;
37  import io.netty.util.internal.UnstableApi;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.net.SocketAddress;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.WritableByteChannel;
45  import java.util.concurrent.Executor;
46  
47  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
48  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
49  
50  @UnstableApi
51  public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
53      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
54      private static final String EXPECTED_TYPES =
55              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
56                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
57      private WritableByteChannel byteChannel;
58      private final Runnable flushTask = new Runnable() {
59          @Override
60          public void run() {
61              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
62              // meantime.
63              ((AbstractKQueueUnsafe) unsafe()).flush0();
64          }
65      };
66  
67      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
68          super(parent, fd, active);
69      }
70  
71      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
72          super(parent, fd, remote);
73      }
74  
75      AbstractKQueueStreamChannel(BsdSocket fd) {
76          this(null, fd, isSoErrorZero(fd));
77      }
78  
79      @Override
80      protected AbstractKQueueUnsafe newUnsafe() {
81          return new KQueueStreamUnsafe();
82      }
83  
84      @Override
85      public ChannelMetadata metadata() {
86          return METADATA;
87      }
88  
89      /**
90       * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
91       * @param in the collection which contains objects to write.
92       * @param buf the {@link ByteBuf} from which the bytes should be written
93       * @return The value that should be decremented from the write quantum which starts at
94       * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
95       * <ul>
96       *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
97       *     is encountered</li>
98       *     <li>1 - if a single call to write data was made to the OS</li>
99       *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
100      *     data was accepted</li>
101      * </ul>
102      */
103     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
104         int readableBytes = buf.readableBytes();
105         if (readableBytes == 0) {
106             in.remove();
107             return 0;
108         }
109 
110         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
111             return doWriteBytes(in, buf);
112         } else {
113             ByteBuffer[] nioBuffers = buf.nioBuffers();
114             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
115                     config().getMaxBytesPerGatheringWrite());
116         }
117     }
118 
119     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
120         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
121         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
122         // make a best effort to adjust as OS behavior changes.
123         if (attempted == written) {
124             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
125                 config().setMaxBytesPerGatheringWrite(attempted << 1);
126             }
127         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
128             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
129         }
130     }
131 
132     /**
133      * Write multiple bytes via {@link IovArray}.
134      * @param in the collection which contains objects to write.
135      * @param array The array which contains the content to write.
136      * @return The value that should be decremented from the write quantum which starts at
137      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
138      * <ul>
139      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
140      *     is encountered</li>
141      *     <li>1 - if a single call to write data was made to the OS</li>
142      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
143      *     no data was accepted</li>
144      * </ul>
145      * @throws IOException If an I/O exception occurs during write.
146      */
147     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
148         final long expectedWrittenBytes = array.size();
149         assert expectedWrittenBytes != 0;
150         final int cnt = array.count();
151         assert cnt != 0;
152 
153         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
154         if (localWrittenBytes > 0) {
155             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
156             in.removeBytes(localWrittenBytes);
157             return 1;
158         }
159         return WRITE_STATUS_SNDBUF_FULL;
160     }
161 
162     /**
163      * Write multiple bytes via {@link ByteBuffer} array.
164      * @param in the collection which contains objects to write.
165      * @param nioBuffers The buffers to write.
166      * @param nioBufferCnt The number of buffers to write.
167      * @param expectedWrittenBytes The number of bytes we expect to write.
168      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
169      * @return The value that should be decremented from the write quantum which starts at
170      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
171      * <ul>
172      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
173      *     is encountered</li>
174      *     <li>1 - if a single call to write data was made to the OS</li>
175      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
176      *     no data was accepted</li>
177      * </ul>
178      * @throws IOException If an I/O exception occurs during write.
179      */
180     private int writeBytesMultiple(
181             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
182             long maxBytesPerGatheringWrite) throws IOException {
183         assert expectedWrittenBytes != 0;
184         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
185             expectedWrittenBytes = maxBytesPerGatheringWrite;
186         }
187 
188         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
189         if (localWrittenBytes > 0) {
190             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
191             in.removeBytes(localWrittenBytes);
192             return 1;
193         }
194         return WRITE_STATUS_SNDBUF_FULL;
195     }
196 
197     /**
198      * Write a {@link DefaultFileRegion}
199      * @param in the collection which contains objects to write.
200      * @param region the {@link DefaultFileRegion} from which the bytes should be written
201      * @return The value that should be decremented from the write quantum which starts at
202      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
203      * <ul>
204      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
205      *     is encountered</li>
206      *     <li>1 - if a single call to write data was made to the OS</li>
207      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
208      *     no data was accepted</li>
209      * </ul>
210      */
211     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
212         final long regionCount = region.count();
213         final long offset = region.transferred();
214 
215         if (offset >= regionCount) {
216             in.remove();
217             return 0;
218         }
219 
220         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
221         if (flushedAmount > 0) {
222             in.progress(flushedAmount);
223             if (region.transferred() >= regionCount) {
224                 in.remove();
225             }
226             return 1;
227         } else if (flushedAmount == 0) {
228             validateFileRegion(region, offset);
229         }
230         return WRITE_STATUS_SNDBUF_FULL;
231     }
232 
233     /**
234      * Write a {@link FileRegion}
235      * @param in the collection which contains objects to write.
236      * @param region the {@link FileRegion} from which the bytes should be written
237      * @return The value that should be decremented from the write quantum which starts at
238      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
239      * <ul>
240      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
241      *     is encountered</li>
242      *     <li>1 - if a single call to write data was made to the OS</li>
243      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
244      *     data was accepted</li>
245      * </ul>
246      */
247     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
248         if (region.transferred() >= region.count()) {
249             in.remove();
250             return 0;
251         }
252 
253         if (byteChannel == null) {
254             byteChannel = new KQueueSocketWritableByteChannel();
255         }
256         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
257         if (flushedAmount > 0) {
258             in.progress(flushedAmount);
259             if (region.transferred() >= region.count()) {
260                 in.remove();
261             }
262             return 1;
263         }
264         return WRITE_STATUS_SNDBUF_FULL;
265     }
266 
267     @Override
268     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
269         int writeSpinCount = config().getWriteSpinCount();
270         do {
271             final int msgCount = in.size();
272             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
273             if (msgCount > 1 && in.current() instanceof ByteBuf) {
274                 writeSpinCount -= doWriteMultiple(in);
275             } else if (msgCount == 0) {
276                 // Wrote all messages.
277                 writeFilter(false);
278                 // Return here so we don't set the WRITE flag.
279                 return;
280             } else { // msgCount == 1
281                 writeSpinCount -= doWriteSingle(in);
282             }
283 
284             // We do not break the loop here even if the outbound buffer was flushed completely,
285             // because a user might have triggered another write and flush when we notify his or her
286             // listeners.
287         } while (writeSpinCount > 0);
288 
289         if (writeSpinCount == 0) {
290             // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
291             // then use our write quantum. In this case we no longer want to set the write filter because the socket is
292             // still writable (as far as we know). We will find out next time we attempt to write if the socket is
293             // writable and set the write filter if necessary.
294             writeFilter(false);
295 
296             // We used our writeSpin quantum, and should try to write again later.
297             eventLoop().execute(flushTask);
298         } else {
299             // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
300             // when it can accept more data.
301             writeFilter(true);
302         }
303     }
304 
305     /**
306      * Attempt to write a single object.
307      * @param in the collection which contains objects to write.
308      * @return The value that should be decremented from the write quantum which starts at
309      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
310      * <ul>
311      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
312      *     is encountered</li>
313      *     <li>1 - if a single call to write data was made to the OS</li>
314      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
315      *     data was accepted</li>
316      * </ul>
317      * @throws Exception If an I/O error occurs.
318      */
319     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
320         // The outbound buffer contains only one message or it contains a file region.
321         Object msg = in.current();
322         if (msg instanceof ByteBuf) {
323             return writeBytes(in, (ByteBuf) msg);
324         } else if (msg instanceof DefaultFileRegion) {
325             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
326         } else if (msg instanceof FileRegion) {
327             return writeFileRegion(in, (FileRegion) msg);
328         } else {
329             // Should never reach here.
330             throw new Error();
331         }
332     }
333 
334     /**
335      * Attempt to write multiple {@link ByteBuf} objects.
336      * @param in the collection which contains objects to write.
337      * @return The value that should be decremented from the write quantum which starts at
338      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
339      * <ul>
340      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
341      *     is encountered</li>
342      *     <li>1 - if a single call to write data was made to the OS</li>
343      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
344      *     data was accepted</li>
345      * </ul>
346      * @throws Exception If an I/O error occurs.
347      */
348     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
349         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
350         IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
351         array.maxBytes(maxBytesPerGatheringWrite);
352         in.forEachFlushedMessage(array);
353 
354         if (array.count() >= 1) {
355             // TODO: Handle the case where cnt == 1 specially.
356             return writeBytesMultiple(in, array);
357         }
358         // cnt == 0, which means the outbound buffer contained empty buffers only.
359         in.removeBytes(0);
360         return 0;
361     }
362 
363     @Override
364     protected Object filterOutboundMessage(Object msg) {
365         if (msg instanceof ByteBuf) {
366             ByteBuf buf = (ByteBuf) msg;
367             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
368         }
369 
370         if (msg instanceof FileRegion) {
371             return msg;
372         }
373 
374         throw new UnsupportedOperationException(
375                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
376     }
377 
378     @UnstableApi
379     @Override
380     protected final void doShutdownOutput() throws Exception {
381         socket.shutdown(false, true);
382     }
383 
384     @Override
385     public boolean isOutputShutdown() {
386         return socket.isOutputShutdown();
387     }
388 
389     @Override
390     public boolean isInputShutdown() {
391         return socket.isInputShutdown();
392     }
393 
394     @Override
395     public boolean isShutdown() {
396         return socket.isShutdown();
397     }
398 
399     @Override
400     public ChannelFuture shutdownOutput() {
401         return shutdownOutput(newPromise());
402     }
403 
404     @Override
405     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
406         EventLoop loop = eventLoop();
407         if (loop.inEventLoop()) {
408             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
409         } else {
410             loop.execute(new Runnable() {
411                 @Override
412                 public void run() {
413                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
414                 }
415             });
416         }
417         return promise;
418     }
419 
420     @Override
421     public ChannelFuture shutdownInput() {
422         return shutdownInput(newPromise());
423     }
424 
425     @Override
426     public ChannelFuture shutdownInput(final ChannelPromise promise) {
427         EventLoop loop = eventLoop();
428         if (loop.inEventLoop()) {
429             shutdownInput0(promise);
430         } else {
431             loop.execute(new Runnable() {
432                 @Override
433                 public void run() {
434                     shutdownInput0(promise);
435                 }
436             });
437         }
438         return promise;
439     }
440 
441     private void shutdownInput0(ChannelPromise promise) {
442         try {
443             socket.shutdown(true, false);
444         } catch (Throwable cause) {
445             promise.setFailure(cause);
446             return;
447         }
448         promise.setSuccess();
449     }
450 
451     @Override
452     public ChannelFuture shutdown() {
453         return shutdown(newPromise());
454     }
455 
456     @Override
457     public ChannelFuture shutdown(final ChannelPromise promise) {
458         ChannelFuture shutdownOutputFuture = shutdownOutput();
459         if (shutdownOutputFuture.isDone()) {
460             shutdownOutputDone(shutdownOutputFuture, promise);
461         } else {
462             shutdownOutputFuture.addListener(new ChannelFutureListener() {
463                 @Override
464                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
465                     shutdownOutputDone(shutdownOutputFuture, promise);
466                 }
467             });
468         }
469         return promise;
470     }
471 
472     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
473         ChannelFuture shutdownInputFuture = shutdownInput();
474         if (shutdownInputFuture.isDone()) {
475             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
476         } else {
477             shutdownInputFuture.addListener(new ChannelFutureListener() {
478                 @Override
479                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
480                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
481                 }
482             });
483         }
484     }
485 
486     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
487                                      ChannelFuture shutdownInputFuture,
488                                      ChannelPromise promise) {
489         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
490         Throwable shutdownInputCause = shutdownInputFuture.cause();
491         if (shutdownOutputCause != null) {
492             if (shutdownInputCause != null) {
493                 logger.debug("Exception suppressed because a previous exception occurred.",
494                         shutdownInputCause);
495             }
496             promise.setFailure(shutdownOutputCause);
497         } else if (shutdownInputCause != null) {
498             promise.setFailure(shutdownInputCause);
499         } else {
500             promise.setSuccess();
501         }
502     }
503 
504     class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
505         // Overridden here just to be able to access this method from AbstractKQueueStreamChannel
506         @Override
507         protected Executor prepareToClose() {
508             return super.prepareToClose();
509         }
510 
511         @Override
512         void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
513             final ChannelConfig config = config();
514             if (shouldBreakReadReady(config)) {
515                 clearReadFilter0();
516                 return;
517             }
518             final ChannelPipeline pipeline = pipeline();
519             final ByteBufAllocator allocator = config.getAllocator();
520             allocHandle.reset(config);
521             readReadyBefore();
522 
523             ByteBuf byteBuf = null;
524             boolean close = false;
525             try {
526                 do {
527                     // we use a direct buffer here as the native implementations only be able
528                     // to handle direct buffers.
529                     byteBuf = allocHandle.allocate(allocator);
530                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
531                     if (allocHandle.lastBytesRead() <= 0) {
532                         // nothing was read, release the buffer.
533                         byteBuf.release();
534                         byteBuf = null;
535                         close = allocHandle.lastBytesRead() < 0;
536                         if (close) {
537                             // There is nothing left to read as we received an EOF.
538                             readPending = false;
539                         }
540                         break;
541                     }
542                     allocHandle.incMessagesRead(1);
543                     readPending = false;
544                     pipeline.fireChannelRead(byteBuf);
545                     byteBuf = null;
546 
547                     if (shouldBreakReadReady(config)) {
548                         // We need to do this for two reasons:
549                         //
550                         // - If the input was shutdown in between (which may be the case when the user did it in the
551                         //   fireChannelRead(...) method we should not try to read again to not produce any
552                         //   miss-leading exceptions.
553                         //
554                         // - If the user closes the channel we need to ensure we not try to read from it again as
555                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
556                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
557                         //   reading data from a filedescriptor that belongs to another socket then the socket that
558                         //   was "wrapped" by this Channel implementation.
559                         break;
560                     }
561                 } while (allocHandle.continueReading());
562 
563                 allocHandle.readComplete();
564                 pipeline.fireChannelReadComplete();
565 
566                 if (close) {
567                     shutdownInput(false);
568                 }
569             } catch (Throwable t) {
570                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
571             } finally {
572                 readReadyFinally(config);
573             }
574         }
575 
576         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
577                                          KQueueRecvByteAllocatorHandle allocHandle) {
578             if (byteBuf != null) {
579                 if (byteBuf.isReadable()) {
580                     readPending = false;
581                     pipeline.fireChannelRead(byteBuf);
582                 } else {
583                     byteBuf.release();
584                 }
585             }
586             if (!failConnectPromise(cause)) {
587                 allocHandle.readComplete();
588                 pipeline.fireChannelReadComplete();
589                 pipeline.fireExceptionCaught(cause);
590 
591                 // If oom will close the read event, release connection.
592                 // See https://github.com/netty/netty/issues/10434
593                 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
594                     shutdownInput(false);
595                 }
596             }
597         }
598     }
599 
600     private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
601         KQueueSocketWritableByteChannel() {
602             super(socket);
603         }
604 
605         @Override
606         protected ByteBufAllocator alloc() {
607             return AbstractKQueueStreamChannel.this.alloc();
608         }
609     }
610 }