View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.internal.ChannelUtils;
32  import io.netty.channel.socket.DuplexChannel;
33  import io.netty.channel.unix.IovArray;
34  import io.netty.channel.unix.SocketWritableByteChannel;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.UnstableApi;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.SocketAddress;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.WritableByteChannel;
46  import java.util.concurrent.Executor;
47  
48  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
49  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
50  
51  @UnstableApi
52  public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
54      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
55      private static final String EXPECTED_TYPES =
56              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
57                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
58      private WritableByteChannel byteChannel;
59      private final Runnable flushTask = new Runnable() {
60          @Override
61          public void run() {
62              flush();
63          }
64      };
65  
66      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
67          super(parent, fd, active, true);
68      }
69  
70      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
71          super(parent, fd, remote);
72      }
73  
74      AbstractKQueueStreamChannel(BsdSocket fd) {
75          this(null, fd, isSoErrorZero(fd));
76      }
77  
78      @Override
79      protected AbstractKQueueUnsafe newUnsafe() {
80          return new KQueueStreamUnsafe();
81      }
82  
83      @Override
84      public ChannelMetadata metadata() {
85          return METADATA;
86      }
87  
88      /**
89       * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
90       * @param in the collection which contains objects to write.
91       * @param buf the {@link ByteBuf} from which the bytes should be written
92       * @return The value that should be decremented from the write quantum which starts at
93       * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
94       * <ul>
95       *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
96       *     is encountered</li>
97       *     <li>1 - if a single call to write data was made to the OS</li>
98       *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
99       *     data was accepted</li>
100      * </ul>
101      */
102     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
103         int readableBytes = buf.readableBytes();
104         if (readableBytes == 0) {
105             in.remove();
106             return 0;
107         }
108 
109         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
110             return doWriteBytes(in, buf);
111         } else {
112             ByteBuffer[] nioBuffers = buf.nioBuffers();
113             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
114                     config().getMaxBytesPerGatheringWrite());
115         }
116     }
117 
118     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
119         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
120         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
121         // make a best effort to adjust as OS behavior changes.
122         if (attempted == written) {
123             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
124                 config().setMaxBytesPerGatheringWrite(attempted << 1);
125             }
126         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
127             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
128         }
129     }
130 
131     /**
132      * Write multiple bytes via {@link IovArray}.
133      * @param in the collection which contains objects to write.
134      * @param array The array which contains the content to write.
135      * @return The value that should be decremented from the write quantum which starts at
136      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
137      * <ul>
138      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
139      *     is encountered</li>
140      *     <li>1 - if a single call to write data was made to the OS</li>
141      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
142      *     no data was accepted</li>
143      * </ul>
144      * @throws IOException If an I/O exception occurs during write.
145      */
146     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
147         final long expectedWrittenBytes = array.size();
148         assert expectedWrittenBytes != 0;
149         final int cnt = array.count();
150         assert cnt != 0;
151 
152         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
153         if (localWrittenBytes > 0) {
154             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
155             in.removeBytes(localWrittenBytes);
156             return 1;
157         }
158         return WRITE_STATUS_SNDBUF_FULL;
159     }
160 
161     /**
162      * Write multiple bytes via {@link ByteBuffer} array.
163      * @param in the collection which contains objects to write.
164      * @param nioBuffers The buffers to write.
165      * @param nioBufferCnt The number of buffers to write.
166      * @param expectedWrittenBytes The number of bytes we expect to write.
167      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
168      * @return The value that should be decremented from the write quantum which starts at
169      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
170      * <ul>
171      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
172      *     is encountered</li>
173      *     <li>1 - if a single call to write data was made to the OS</li>
174      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
175      *     no data was accepted</li>
176      * </ul>
177      * @throws IOException If an I/O exception occurs during write.
178      */
179     private int writeBytesMultiple(
180             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
181             long maxBytesPerGatheringWrite) throws IOException {
182         assert expectedWrittenBytes != 0;
183         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
184             expectedWrittenBytes = maxBytesPerGatheringWrite;
185         }
186 
187         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
188         if (localWrittenBytes > 0) {
189             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
190             in.removeBytes(localWrittenBytes);
191             return 1;
192         }
193         return WRITE_STATUS_SNDBUF_FULL;
194     }
195 
196     /**
197      * Write a {@link DefaultFileRegion}
198      * @param in the collection which contains objects to write.
199      * @param region the {@link DefaultFileRegion} from which the bytes should be written
200      * @return The value that should be decremented from the write quantum which starts at
201      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
202      * <ul>
203      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
204      *     is encountered</li>
205      *     <li>1 - if a single call to write data was made to the OS</li>
206      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
207      *     no data was accepted</li>
208      * </ul>
209      */
210     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
211         final long regionCount = region.count();
212         if (region.transferred() >= regionCount) {
213             in.remove();
214             return 0;
215         }
216 
217         final long offset = region.transferred();
218         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
219         if (flushedAmount > 0) {
220             in.progress(flushedAmount);
221             if (region.transferred() >= regionCount) {
222                 in.remove();
223             }
224             return 1;
225         }
226         return WRITE_STATUS_SNDBUF_FULL;
227     }
228 
229     /**
230      * Write a {@link FileRegion}
231      * @param in the collection which contains objects to write.
232      * @param region the {@link FileRegion} from which the bytes should be written
233      * @return The value that should be decremented from the write quantum which starts at
234      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
235      * <ul>
236      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
237      *     is encountered</li>
238      *     <li>1 - if a single call to write data was made to the OS</li>
239      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
240      *     data was accepted</li>
241      * </ul>
242      */
243     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
244         if (region.transferred() >= region.count()) {
245             in.remove();
246             return 0;
247         }
248 
249         if (byteChannel == null) {
250             byteChannel = new KQueueSocketWritableByteChannel();
251         }
252         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
253         if (flushedAmount > 0) {
254             in.progress(flushedAmount);
255             if (region.transferred() >= region.count()) {
256                 in.remove();
257             }
258             return 1;
259         }
260         return WRITE_STATUS_SNDBUF_FULL;
261     }
262 
263     @Override
264     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
265         int writeSpinCount = config().getWriteSpinCount();
266         do {
267             final int msgCount = in.size();
268             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
269             if (msgCount > 1 && in.current() instanceof ByteBuf) {
270                 writeSpinCount -= doWriteMultiple(in);
271             } else if (msgCount == 0) {
272                 // Wrote all messages.
273                 writeFilter(false);
274                 // Return here so we don't set the WRITE flag.
275                 return;
276             } else { // msgCount == 1
277                 writeSpinCount -= doWriteSingle(in);
278             }
279 
280             // We do not break the loop here even if the outbound buffer was flushed completely,
281             // because a user might have triggered another write and flush when we notify his or her
282             // listeners.
283         } while (writeSpinCount > 0);
284 
285         if (writeSpinCount == 0) {
286             // We used our writeSpin quantum, and should try to write again later.
287             eventLoop().execute(flushTask);
288         } else {
289             // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
290             // when it can accept more data.
291             writeFilter(true);
292         }
293     }
294 
295     /**
296      * Attempt to write a single object.
297      * @param in the collection which contains objects to write.
298      * @return The value that should be decremented from the write quantum which starts at
299      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
300      * <ul>
301      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
302      *     is encountered</li>
303      *     <li>1 - if a single call to write data was made to the OS</li>
304      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
305      *     data was accepted</li>
306      * </ul>
307      * @throws Exception If an I/O error occurs.
308      */
309     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
310         // The outbound buffer contains only one message or it contains a file region.
311         Object msg = in.current();
312         if (msg instanceof ByteBuf) {
313             return writeBytes(in, (ByteBuf) msg);
314         } else if (msg instanceof DefaultFileRegion) {
315             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
316         } else if (msg instanceof FileRegion) {
317             return writeFileRegion(in, (FileRegion) msg);
318         } else {
319             // Should never reach here.
320             throw new Error();
321         }
322     }
323 
324     /**
325      * Attempt to write multiple {@link ByteBuf} objects.
326      * @param in the collection which contains objects to write.
327      * @return The value that should be decremented from the write quantum which starts at
328      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
329      * <ul>
330      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
331      *     is encountered</li>
332      *     <li>1 - if a single call to write data was made to the OS</li>
333      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
334      *     data was accepted</li>
335      * </ul>
336      * @throws Exception If an I/O error occurs.
337      */
338     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
339         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
340         if (PlatformDependent.hasUnsafe()) {
341             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
342             array.maxBytes(maxBytesPerGatheringWrite);
343             in.forEachFlushedMessage(array);
344 
345             if (array.count() >= 1) {
346                 // TODO: Handle the case where cnt == 1 specially.
347                 return writeBytesMultiple(in, array);
348             }
349         } else {
350             ByteBuffer[] buffers = in.nioBuffers();
351             int cnt = in.nioBufferCount();
352             if (cnt >= 1) {
353                 // TODO: Handle the case where cnt == 1 specially.
354                 return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
355             }
356         }
357         // cnt == 0, which means the outbound buffer contained empty buffers only.
358         in.removeBytes(0);
359         return 0;
360     }
361 
362     @Override
363     protected Object filterOutboundMessage(Object msg) {
364         if (msg instanceof ByteBuf) {
365             ByteBuf buf = (ByteBuf) msg;
366             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
367         }
368 
369         if (msg instanceof FileRegion) {
370             return msg;
371         }
372 
373         throw new UnsupportedOperationException(
374                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
375     }
376 
377     @UnstableApi
378     @Override
379     protected final void doShutdownOutput() throws Exception {
380         socket.shutdown(false, true);
381     }
382 
383     @Override
384     public boolean isOutputShutdown() {
385         return socket.isOutputShutdown();
386     }
387 
388     @Override
389     public boolean isInputShutdown() {
390         return socket.isInputShutdown();
391     }
392 
393     @Override
394     public boolean isShutdown() {
395         return socket.isShutdown();
396     }
397 
398     @Override
399     public ChannelFuture shutdownOutput() {
400         return shutdownOutput(newPromise());
401     }
402 
403     @Override
404     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
405         EventLoop loop = eventLoop();
406         if (loop.inEventLoop()) {
407             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
408         } else {
409             loop.execute(new Runnable() {
410                 @Override
411                 public void run() {
412                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
413                 }
414             });
415         }
416         return promise;
417     }
418 
419     @Override
420     public ChannelFuture shutdownInput() {
421         return shutdownInput(newPromise());
422     }
423 
424     @Override
425     public ChannelFuture shutdownInput(final ChannelPromise promise) {
426         EventLoop loop = eventLoop();
427         if (loop.inEventLoop()) {
428             shutdownInput0(promise);
429         } else {
430             loop.execute(new Runnable() {
431                 @Override
432                 public void run() {
433                     shutdownInput0(promise);
434                 }
435             });
436         }
437         return promise;
438     }
439 
440     private void shutdownInput0(ChannelPromise promise) {
441         try {
442             socket.shutdown(true, false);
443         } catch (Throwable cause) {
444             promise.setFailure(cause);
445             return;
446         }
447         promise.setSuccess();
448     }
449 
450     @Override
451     public ChannelFuture shutdown() {
452         return shutdown(newPromise());
453     }
454 
455     @Override
456     public ChannelFuture shutdown(final ChannelPromise promise) {
457         ChannelFuture shutdownOutputFuture = shutdownOutput();
458         if (shutdownOutputFuture.isDone()) {
459             shutdownOutputDone(shutdownOutputFuture, promise);
460         } else {
461             shutdownOutputFuture.addListener(new ChannelFutureListener() {
462                 @Override
463                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
464                     shutdownOutputDone(shutdownOutputFuture, promise);
465                 }
466             });
467         }
468         return promise;
469     }
470 
471     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
472         ChannelFuture shutdownInputFuture = shutdownInput();
473         if (shutdownInputFuture.isDone()) {
474             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
475         } else {
476             shutdownInputFuture.addListener(new ChannelFutureListener() {
477                 @Override
478                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
479                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
480                 }
481             });
482         }
483     }
484 
485     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
486                                      ChannelFuture shutdownInputFuture,
487                                      ChannelPromise promise) {
488         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
489         Throwable shutdownInputCause = shutdownInputFuture.cause();
490         if (shutdownOutputCause != null) {
491             if (shutdownInputCause != null) {
492                 logger.debug("Exception suppressed because a previous exception occurred.",
493                         shutdownInputCause);
494             }
495             promise.setFailure(shutdownOutputCause);
496         } else if (shutdownInputCause != null) {
497             promise.setFailure(shutdownInputCause);
498         } else {
499             promise.setSuccess();
500         }
501     }
502 
503     class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
504         // Overridden here just to be able to access this method from AbstractKQueueStreamChannel
505         @Override
506         protected Executor prepareToClose() {
507             return super.prepareToClose();
508         }
509 
510         @Override
511         void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
512             final ChannelConfig config = config();
513             if (shouldBreakReadReady(config)) {
514                 clearReadFilter0();
515                 return;
516             }
517             final ChannelPipeline pipeline = pipeline();
518             final ByteBufAllocator allocator = config.getAllocator();
519             allocHandle.reset(config);
520             readReadyBefore();
521 
522             ByteBuf byteBuf = null;
523             boolean close = false;
524             try {
525                 do {
526                     // we use a direct buffer here as the native implementations only be able
527                     // to handle direct buffers.
528                     byteBuf = allocHandle.allocate(allocator);
529                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
530                     if (allocHandle.lastBytesRead() <= 0) {
531                         // nothing was read, release the buffer.
532                         byteBuf.release();
533                         byteBuf = null;
534                         close = allocHandle.lastBytesRead() < 0;
535                         if (close) {
536                             // There is nothing left to read as we received an EOF.
537                             readPending = false;
538                         }
539                         break;
540                     }
541                     allocHandle.incMessagesRead(1);
542                     readPending = false;
543                     pipeline.fireChannelRead(byteBuf);
544                     byteBuf = null;
545 
546                     if (shouldBreakReadReady(config)) {
547                         // We need to do this for two reasons:
548                         //
549                         // - If the input was shutdown in between (which may be the case when the user did it in the
550                         //   fireChannelRead(...) method we should not try to read again to not produce any
551                         //   miss-leading exceptions.
552                         //
553                         // - If the user closes the channel we need to ensure we not try to read from it again as
554                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
555                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
556                         //   reading data from a filedescriptor that belongs to another socket then the socket that
557                         //   was "wrapped" by this Channel implementation.
558                         break;
559                     }
560                 } while (allocHandle.continueReading());
561 
562                 allocHandle.readComplete();
563                 pipeline.fireChannelReadComplete();
564 
565                 if (close) {
566                     shutdownInput(false);
567                 }
568             } catch (Throwable t) {
569                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
570             } finally {
571                 readReadyFinally(config);
572             }
573         }
574 
575         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
576                                          KQueueRecvByteAllocatorHandle allocHandle) {
577             if (byteBuf != null) {
578                 if (byteBuf.isReadable()) {
579                     readPending = false;
580                     pipeline.fireChannelRead(byteBuf);
581                 } else {
582                     byteBuf.release();
583                 }
584             }
585             allocHandle.readComplete();
586             pipeline.fireChannelReadComplete();
587             pipeline.fireExceptionCaught(cause);
588             if (close || cause instanceof IOException) {
589                 shutdownInput(false);
590             }
591         }
592     }
593 
594     private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
595         KQueueSocketWritableByteChannel() {
596             super(socket);
597         }
598 
599         @Override
600         protected ByteBufAllocator alloc() {
601             return AbstractKQueueStreamChannel.this.alloc();
602         }
603     }
604 }