View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.kqueue;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.AdaptiveRecvBufferAllocator;
21  import io.netty5.channel.ChannelException;
22  import io.netty5.channel.ChannelMetadata;
23  import io.netty5.channel.ChannelOption;
24  import io.netty5.channel.ChannelOutboundBuffer;
25  import io.netty5.channel.ChannelPipeline;
26  import io.netty5.channel.ChannelShutdownDirection;
27  import io.netty5.channel.DefaultFileRegion;
28  import io.netty5.channel.EventLoop;
29  import io.netty5.channel.FileRegion;
30  import io.netty5.channel.RecvBufferAllocator;
31  import io.netty5.channel.internal.ChannelUtils;
32  import io.netty5.channel.socket.SocketChannel;
33  import io.netty5.channel.socket.SocketProtocolFamily;
34  import io.netty5.channel.unix.DomainSocketReadMode;
35  import io.netty5.channel.unix.FileDescriptor;
36  import io.netty5.channel.unix.IntegerUnixChannelOption;
37  import io.netty5.channel.unix.IovArray;
38  import io.netty5.channel.unix.PeerCredentials;
39  import io.netty5.channel.unix.RawUnixChannelOption;
40  import io.netty5.channel.unix.SocketWritableByteChannel;
41  import io.netty5.channel.unix.UnixChannel;
42  import io.netty5.channel.unix.UnixChannelOption;
43  import io.netty5.channel.unix.UnixChannelUtil;
44  import io.netty5.util.Resource;
45  import io.netty5.util.concurrent.Future;
46  import io.netty5.util.concurrent.GlobalEventExecutor;
47  import io.netty5.util.internal.PlatformDependent;
48  import io.netty5.util.internal.StringUtil;
49  import io.netty5.util.internal.UnstableApi;
50  
51  import java.io.IOException;
52  import java.net.InetSocketAddress;
53  import java.net.ProtocolFamily;
54  import java.net.SocketAddress;
55  import java.nio.ByteBuffer;
56  import java.nio.channels.NotYetConnectedException;
57  import java.nio.channels.WritableByteChannel;
58  import java.util.Set;
59  import java.util.concurrent.Executor;
60  import java.util.function.Predicate;
61  
62  import static io.netty5.channel.ChannelOption.IP_TOS;
63  import static io.netty5.channel.ChannelOption.SO_KEEPALIVE;
64  import static io.netty5.channel.ChannelOption.SO_LINGER;
65  import static io.netty5.channel.ChannelOption.SO_RCVBUF;
66  import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
67  import static io.netty5.channel.ChannelOption.SO_SNDBUF;
68  import static io.netty5.channel.ChannelOption.TCP_NODELAY;
69  import static io.netty5.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
70  import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
71  import static io.netty5.channel.kqueue.KQueueChannelOption.SO_SNDLOWAT;
72  import static io.netty5.channel.kqueue.KQueueChannelOption.TCP_NOPUSH;
73  import static io.netty5.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
74  import static java.util.Objects.requireNonNull;
75  
76  /**
77   * {@link SocketChannel} implementation that uses KQueue.
78   *
79   * <h3>Available options</h3>
80   *
81   * In addition to the options provided by {@link SocketChannel} and {@link UnixChannel},
82   * {@link KQueueSocketChannel} allows the following options in the option map:
83   * <table border="1" cellspacing="0" cellpadding="6">
84   * <tr>
85   * <th>{@link ChannelOption}</th>
86   * <th>{@code INET}</th>
87   * <th>{@code INET6}</th>
88   * <th>{@code UNIX}</th>
89   * </tr><tr>
90   * <td>{@link IntegerUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
91   * </tr><tr>
92   * <td>{@link RawUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
93   * </tr><tr>
94   * <td>{@link KQueueChannelOption#SO_SNDLOWAT}</td><td>X</td><td>X</td><td>-</td>
95   * </tr><tr>
96   * <td>{@link KQueueChannelOption#TCP_NOPUSH}</td><td>X</td><td>X</td><td>-</td>
97   * </tr><tr>
98   * <td>{@link ChannelOption#TCP_FASTOPEN_CONNECT}</td><td>X</td><td>X</td><td>-</td>
99   * </tr>
100  * </table>
101  */
102 @UnstableApi
103 public final class KQueueSocketChannel
104         extends AbstractKQueueChannel<KQueueServerSocketChannel>
105         implements SocketChannel {
106     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
107     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
108 
109     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
110     private static final String EXPECTED_TYPES =
111             " (expected: " + StringUtil.simpleClassName(Buffer.class) + ", " +
112                     StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
113     private WritableByteChannel byteChannel;
114 
115     // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
116     // meantime.
117     private final Runnable flushTask = this::writeFlushed;
118 
119     private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES;
120 
121     private volatile boolean tcpFastopen;
122 
123     public KQueueSocketChannel(EventLoop eventLoop) {
124         this(eventLoop, (ProtocolFamily) null);
125     }
126 
127     public KQueueSocketChannel(EventLoop eventLoop, ProtocolFamily protocolFamily) {
128         super(null, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), BsdSocket.newSocket(protocolFamily), false);
129         enableTcpNoDelayIfSupported();
130         calculateMaxBytesPerGatheringWrite();
131     }
132 
133     public KQueueSocketChannel(EventLoop eventLoop, int fd, ProtocolFamily protocolFamily) {
134         this(eventLoop, new BsdSocket(fd, SocketProtocolFamily.of(protocolFamily)));
135     }
136 
137     private KQueueSocketChannel(EventLoop eventLoop, BsdSocket fd) {
138         super(null, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), fd, isSoErrorZero(fd));
139         enableTcpNoDelayIfSupported();
140         calculateMaxBytesPerGatheringWrite();
141     }
142 
143     KQueueSocketChannel(KQueueServerSocketChannel parent, EventLoop eventLoop,
144                         BsdSocket fd, SocketAddress remoteAddress) {
145         super(parent, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), fd, remoteAddress);
146         enableTcpNoDelayIfSupported();
147         calculateMaxBytesPerGatheringWrite();
148     }
149 
150     private void enableTcpNoDelayIfSupported() {
151         if (socket.protocolFamily() != SocketProtocolFamily.UNIX && PlatformDependent.canEnableTcpNoDelayByDefault()) {
152             setTcpNoDelay(true);
153         }
154     }
155 
156     @SuppressWarnings("unchecked")
157     @Override
158     protected <T> T getExtendedOption(ChannelOption<T> option) {
159         if (isSupported(socket.protocolFamily(), option)) {
160             if (option == SO_RCVBUF) {
161                 return (T) Integer.valueOf(getReceiveBufferSize());
162             }
163             if (option == SO_SNDBUF) {
164                 return (T) Integer.valueOf(getSendBufferSize());
165             }
166             if (option == TCP_NODELAY) {
167                 return (T) Boolean.valueOf(isTcpNoDelay());
168             }
169             if (option == SO_KEEPALIVE) {
170                 return (T) Boolean.valueOf(isKeepAlive());
171             }
172             if (option == SO_REUSEADDR) {
173                 return (T) Boolean.valueOf(isReuseAddress());
174             }
175             if (option == SO_LINGER) {
176                 return (T) Integer.valueOf(getSoLinger());
177             }
178             if (option == IP_TOS) {
179                 return (T) Integer.valueOf(getTrafficClass());
180             }
181             if (option == SO_SNDLOWAT) {
182                 return (T) Integer.valueOf(getSndLowAt());
183             }
184             if (option == TCP_NOPUSH) {
185                 return (T) Boolean.valueOf(isTcpNoPush());
186             }
187             if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
188                 return (T) Boolean.valueOf(isTcpFastOpenConnect());
189             }
190             if (option == DOMAIN_SOCKET_READ_MODE) {
191                 return (T) getReadMode();
192             }
193             if (option == UnixChannelOption.SO_PEERCRED) {
194                 return (T) getPeerCredentials();
195             }
196         }
197         return super.getExtendedOption(option);
198     }
199 
200     @Override
201     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
202         if (isSupported(socket.protocolFamily(), option)) {
203             if (option == SO_RCVBUF) {
204                 setReceiveBufferSize((Integer) value);
205             } else if (option == SO_SNDBUF) {
206                 setSendBufferSize((Integer) value);
207             } else if (option == TCP_NODELAY) {
208                 setTcpNoDelay((Boolean) value);
209             } else if (option == SO_KEEPALIVE) {
210                 setKeepAlive((Boolean) value);
211             } else if (option == SO_REUSEADDR) {
212                 setReuseAddress((Boolean) value);
213             } else if (option == SO_LINGER) {
214                 setSoLinger((Integer) value);
215             } else if (option == IP_TOS) {
216                 setTrafficClass((Integer) value);
217             } else if (option == SO_SNDLOWAT) {
218                 setSndLowAt((Integer) value);
219             } else if (option == TCP_NOPUSH) {
220                 setTcpNoPush((Boolean) value);
221             } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
222                 setTcpFastOpenConnect((Boolean) value);
223             } else if (option == DOMAIN_SOCKET_READ_MODE) {
224                 setReadMode((DomainSocketReadMode) value);
225             } else if (option == UnixChannelOption.SO_PEERCRED) {
226                 throw new UnsupportedOperationException("read-only option: " + option);
227             }
228         } else {
229             super.setExtendedOption(option, value);
230         }
231     }
232 
233     private boolean isSupported(SocketProtocolFamily protocolFamily, ChannelOption<?> option) {
234         if (protocolFamily == SocketProtocolFamily.UNIX) {
235             return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
236         }
237         return SUPPORTED_OPTIONS.contains(option);
238     }
239 
240     @Override
241     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
242         return isSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
243     }
244 
245     private static Set<ChannelOption<?>> supportedOptions() {
246         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, TCP_NODELAY,
247                 SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, SO_SNDLOWAT, TCP_NOPUSH,
248                 ChannelOption.TCP_FASTOPEN_CONNECT);
249     }
250 
251     private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
252         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, DOMAIN_SOCKET_READ_MODE,
253                 UnixChannelOption.SO_PEERCRED);
254     }
255 
256     private void setReadMode(DomainSocketReadMode mode) {
257         requireNonNull(mode, "mode");
258         this.mode = mode;
259     }
260 
261     private DomainSocketReadMode getReadMode() {
262         return mode;
263     }
264 
265     private int getReceiveBufferSize() {
266         try {
267             return socket.getReceiveBufferSize();
268         } catch (IOException e) {
269             throw new ChannelException(e);
270         }
271     }
272 
273     private int getSendBufferSize() {
274         try {
275             return socket.getSendBufferSize();
276         } catch (IOException e) {
277             throw new ChannelException(e);
278         }
279     }
280 
281     private int getSoLinger() {
282         try {
283             return socket.getSoLinger();
284         } catch (IOException e) {
285             throw new ChannelException(e);
286         }
287     }
288 
289     private int getTrafficClass() {
290         try {
291             return socket.getTrafficClass();
292         } catch (IOException e) {
293             throw new ChannelException(e);
294         }
295     }
296 
297     private boolean isKeepAlive() {
298         try {
299             return socket.isKeepAlive();
300         } catch (IOException e) {
301             throw new ChannelException(e);
302         }
303     }
304 
305     private boolean isReuseAddress() {
306         try {
307             return socket.isReuseAddress();
308         } catch (IOException e) {
309             throw new ChannelException(e);
310         }
311     }
312 
313     private boolean isTcpNoDelay() {
314         try {
315             return socket.isTcpNoDelay();
316         } catch (IOException e) {
317             throw new ChannelException(e);
318         }
319     }
320 
321     private int getSndLowAt() {
322         try {
323             return socket.getSndLowAt();
324         } catch (IOException e) {
325             throw new ChannelException(e);
326         }
327     }
328 
329     private void setSndLowAt(int sndLowAt)  {
330         try {
331             socket.setSndLowAt(sndLowAt);
332         } catch (IOException e) {
333             throw new ChannelException(e);
334         }
335     }
336 
337     private boolean isTcpNoPush() {
338         try {
339             return socket.isTcpNoPush();
340         } catch (IOException e) {
341             throw new ChannelException(e);
342         }
343     }
344 
345     private void setTcpNoPush(boolean tcpNoPush)  {
346         try {
347             socket.setTcpNoPush(tcpNoPush);
348         } catch (IOException e) {
349             throw new ChannelException(e);
350         }
351     }
352 
353     private void setKeepAlive(boolean keepAlive) {
354         try {
355             socket.setKeepAlive(keepAlive);
356         } catch (IOException e) {
357             throw new ChannelException(e);
358         }
359     }
360 
361     private void setReceiveBufferSize(int receiveBufferSize) {
362         try {
363             socket.setReceiveBufferSize(receiveBufferSize);
364         } catch (IOException e) {
365             throw new ChannelException(e);
366         }
367     }
368 
369     private void setReuseAddress(boolean reuseAddress) {
370         try {
371             socket.setReuseAddress(reuseAddress);
372         } catch (IOException e) {
373             throw new ChannelException(e);
374         }
375     }
376 
377     private void setSendBufferSize(int sendBufferSize) {
378         try {
379             socket.setSendBufferSize(sendBufferSize);
380             calculateMaxBytesPerGatheringWrite();
381         } catch (IOException e) {
382             throw new ChannelException(e);
383         }
384     }
385 
386     private void setSoLinger(int soLinger) {
387         try {
388             socket.setSoLinger(soLinger);
389         } catch (IOException e) {
390             throw new ChannelException(e);
391         }
392     }
393 
394     private void setTcpNoDelay(boolean tcpNoDelay) {
395         try {
396             socket.setTcpNoDelay(tcpNoDelay);
397         } catch (IOException e) {
398             throw new ChannelException(e);
399         }
400     }
401 
402     private void setTrafficClass(int trafficClass) {
403         try {
404             socket.setTrafficClass(trafficClass);
405         } catch (IOException e) {
406             throw new ChannelException(e);
407         }
408     }
409 
410     /**
411      * Enables client TCP fast open, if available.
412      */
413     private void setTcpFastOpenConnect(boolean fastOpenConnect) {
414         tcpFastopen = fastOpenConnect;
415     }
416 
417     /**
418      * Returns {@code true} if TCP fast open is enabled, {@code false} otherwise.
419      */
420     private boolean isTcpFastOpenConnect() {
421         return tcpFastopen;
422     }
423 
424     private void calculateMaxBytesPerGatheringWrite() {
425         // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
426         int newSendBufferSize = getSendBufferSize() << 1;
427         if (newSendBufferSize > 0) {
428             setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
429         }
430     }
431 
432     @Override
433     protected Object filterOutboundMessage(Object msg) {
434         if (socket.protocolFamily() == SocketProtocolFamily.UNIX && msg instanceof FileDescriptor) {
435             return msg;
436         }
437         if (msg instanceof Buffer) {
438             Buffer buf = (Buffer) msg;
439             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
440         }
441 
442         if (msg instanceof FileRegion) {
443             return msg;
444         }
445 
446         throw new UnsupportedOperationException(
447                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
448     }
449 
450     @Override
451     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
452         if (isTcpFastOpenConnect()) {
453             ChannelOutboundBuffer outbound = outboundBuffer();
454             outbound.addFlush();
455             Object curr;
456             if ((curr = outbound.current()) instanceof Buffer) {
457                 Buffer initialData = (Buffer) curr;
458                 // Don't bother with TCP FastOpen if we don't have any initial data to send anyway.
459                 if (initialData.readableBytes() > 0) {
460                     IovArray iov = new IovArray();
461                     try {
462                         initialData.forEachReadable(0, iov);
463                         int bytesSent = socket.connectx(
464                                 (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
465                         writeFilter(true);
466                         outbound.removeBytes(Math.abs(bytesSent));
467                         // The `connectx` method returns a negative number if connection is in-progress.
468                         // So we should return `true` to indicate that connection was established, if it's positive.
469                         return bytesSent > 0;
470                     } finally {
471                         iov.release();
472                     }
473                 }
474             }
475         }
476         return super.doConnect0(remoteAddress, localAddress);
477     }
478 
479     @Override
480     protected Future<Executor> prepareToClose() {
481         if (socket.protocolFamily() != SocketProtocolFamily.UNIX) {
482             try {
483                 // Check isOpen() first as otherwise it will throw a RuntimeException
484                 // when call getSoLinger() as the fd is not valid anymore.
485                 if (isOpen() && getSoLinger() > 0) {
486                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
487                     // because we try to read or write until the actual close happens which may be later due
488                     // SO_LINGER handling.
489                     // See https://github.com/netty/netty/issues/4449
490                     return executor().deregisterForIo(this).map(v -> GlobalEventExecutor.INSTANCE);
491                 }
492             } catch (Throwable ignore) {
493                 // Ignore the error as the underlying channel may be closed in the meantime and so
494                 // getSoLinger() may produce an exception. In this case we just return null.
495                 // See https://github.com/netty/netty/issues/4449
496             }
497         }
498         return null;
499     }
500 
501     @Override
502     void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
503                    Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
504         if (socket.protocolFamily() == SocketProtocolFamily.UNIX &&
505                 getReadMode() == DomainSocketReadMode.FILE_DESCRIPTORS) {
506             readReadyFd(allocHandle);
507         } else {
508             readReadyBytes(allocHandle, recvBufferAllocator, maybeMoreData);
509         }
510     }
511 
512     private void readReadyFd(RecvBufferAllocator.Handle allocHandle) {
513         final ChannelPipeline pipeline = pipeline();
514         try {
515             readLoop: do {
516                 // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
517                 // KQueueRecvBufferAllocatorHandle knows if it should try to read again or not when autoRead is
518                 // enabled.
519                 int recvFd = socket.recvFd();
520                 switch(recvFd) {
521                     case 0:
522                         allocHandle.lastBytesRead(0);
523                         break readLoop;
524                     case -1:
525                         allocHandle.lastBytesRead(-1);
526                         closeTransportNow();
527                         return;
528                     default:
529                         allocHandle.lastBytesRead(1);
530                         allocHandle.incMessagesRead(1);
531                         readPending = false;
532                         pipeline.fireChannelRead(new FileDescriptor(recvFd));
533                         break;
534                 }
535             } while (allocHandle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
536 
537             allocHandle.readComplete();
538             pipeline.fireChannelReadComplete();
539         } catch (Throwable t) {
540             allocHandle.readComplete();
541             pipeline.fireChannelReadComplete();
542             pipeline.fireChannelExceptionCaught(t);
543         } finally {
544             readIfIsAutoRead();
545         }
546     }
547 
548     private PeerCredentials getPeerCredentials() {
549         try {
550             return socket.getPeerCredentials();
551         } catch (IOException e) {
552             throw new ChannelException(e);
553         }
554     }
555 
556     /**
557      * Write bytes form the given {@link Buffer} to the underlying {@link java.nio.channels.Channel}.
558      * @param in the collection which contains objects to write.
559      * @param buf the {@link Buffer} from which the bytes should be written
560      * @return The value that should be decremented from the write-quantum which starts at
561      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
562      * <ul>
563      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
564      *     is encountered</li>
565      *     <li>1 - if a single call to write data was made to the OS</li>
566      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
567      *     no data was accepted</li>
568      * </ul>
569      */
570     private int writeBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
571         int readableBytes = buf.readableBytes();
572         if (readableBytes == 0) {
573             in.remove();
574             return 0;
575         }
576 
577         int readableComponents = buf.countReadableComponents();
578         if (readableComponents == 1) {
579             return doWriteBytes(in, buf);
580         }
581         ByteBuffer[] nioBuffers = new ByteBuffer[readableComponents];
582         buf.forEachReadable(0, (index, component) -> {
583             nioBuffers[index] = component.readableBuffer();
584             return true;
585         });
586         return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
587                 getMaxBytesPerGatheringWrite());
588     }
589 
590     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
591         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
592         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
593         // make a best effort to adjust as OS behavior changes.
594         if (attempted == written) {
595             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
596                 setMaxBytesPerGatheringWrite(attempted << 1);
597             }
598         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
599             setMaxBytesPerGatheringWrite(attempted >>> 1);
600         }
601     }
602 
603     /**
604      * Write multiple bytes via {@link IovArray}.
605      * @param in the collection which contains objects to write.
606      * @param array The array which contains the content to write.
607      * @return The value that should be decremented from the write quantum which starts at
608      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
609      * <ul>
610      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
611      *     is encountered</li>
612      *     <li>1 - if a single call to write data was made to the OS</li>
613      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
614      *     no data was accepted</li>
615      * </ul>
616      * @throws IOException If an I/O exception occurs during write.
617      */
618     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
619         final long expectedWrittenBytes = array.size();
620         assert expectedWrittenBytes != 0;
621         final int cnt = array.count();
622         assert cnt != 0;
623 
624         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
625         if (localWrittenBytes > 0) {
626             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
627             in.removeBytes(localWrittenBytes);
628             return 1;
629         }
630         return WRITE_STATUS_SNDBUF_FULL;
631     }
632 
633     /**
634      * Write multiple bytes via {@link ByteBuffer} array.
635      * @param in the collection which contains objects to write.
636      * @param nioBuffers The buffers to write.
637      * @param nioBufferCnt The number of buffers to write.
638      * @param expectedWrittenBytes The number of bytes we expect to write.
639      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
640      * @return The value that should be decremented from the write quantum which starts at
641      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
642      * <ul>
643      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
644      *     is encountered</li>
645      *     <li>1 - if a single call to write data was made to the OS</li>
646      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
647      *     no data was accepted</li>
648      * </ul>
649      * @throws IOException If an I/O exception occurs during write.
650      */
651     private int writeBytesMultiple(
652             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
653             long maxBytesPerGatheringWrite) throws IOException {
654         assert expectedWrittenBytes != 0;
655         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
656             expectedWrittenBytes = maxBytesPerGatheringWrite;
657         }
658 
659         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
660         if (localWrittenBytes > 0) {
661             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
662             in.removeBytes(localWrittenBytes);
663             return 1;
664         }
665         return WRITE_STATUS_SNDBUF_FULL;
666     }
667 
668     /**
669      * Write a {@link DefaultFileRegion}
670      * @param in the collection which contains objects to write.
671      * @param region the {@link DefaultFileRegion} from which the bytes should be written
672      * @return The value that should be decremented from the write quantum which starts at
673      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
674      * <ul>
675      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
676      *     is encountered</li>
677      *     <li>1 - if a single call to write data was made to the OS</li>
678      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
679      *     no data was accepted</li>
680      * </ul>
681      */
682     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
683         final long regionCount = region.count();
684         final long offset = region.transferred();
685 
686         if (offset >= regionCount) {
687             in.remove();
688             return 0;
689         }
690 
691         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
692         if (flushedAmount > 0) {
693             in.progress(flushedAmount);
694             if (region.transferred() >= regionCount) {
695                 in.remove();
696             }
697             return 1;
698         }
699         if (flushedAmount == 0) {
700             validateFileRegion(region, offset);
701         }
702         return WRITE_STATUS_SNDBUF_FULL;
703     }
704 
705     /**
706      * Write a {@link FileRegion}
707      * @param in the collection which contains objects to write.
708      * @param region the {@link FileRegion} from which the bytes should be written
709      * @return The value that should be decremented from the write quantum which starts at
710      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
711      * <ul>
712      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
713      *     is encountered</li>
714      *     <li>1 - if a single call to write data was made to the OS</li>
715      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
716      *     data was accepted</li>
717      * </ul>
718      */
719     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
720         if (region.transferred() >= region.count()) {
721             in.remove();
722             return 0;
723         }
724 
725         if (byteChannel == null) {
726             byteChannel = new KQueueSocketWritableByteChannel();
727         }
728         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
729         if (flushedAmount > 0) {
730             in.progress(flushedAmount);
731             if (region.transferred() >= region.count()) {
732                 in.remove();
733             }
734             return 1;
735         }
736         return WRITE_STATUS_SNDBUF_FULL;
737     }
738 
739     @Override
740     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
741         int writeSpinCount = getWriteSpinCount();
742         do {
743             final int msgCount = in.size();
744             // Do gathering write if the outbound buffer entries start with more than one Buffer.
745             if (msgCount > 1 && in.current() instanceof Buffer) {
746                 writeSpinCount -= doWriteMultiple(in);
747             } else if (msgCount == 0) {
748                 // Wrote all messages.
749                 writeFilter(false);
750                 // Return here so we don't set the WRITE flag.
751                 return;
752             } else { // msgCount == 1
753                 writeSpinCount -= doWriteSingle(in);
754             }
755 
756             // We do not break the loop here even if the outbound buffer was flushed completely,
757             // because a user might have triggered another write and flush when we notify his or her
758             // listeners.
759         } while (writeSpinCount > 0);
760 
761         if (writeSpinCount == 0) {
762             // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
763             // then use our write quantum. In this case we no longer want to set the write filter because the socket is
764             // still writable (as far as we know). We will find out next time we attempt to write if the socket is
765             // writable and set the write filter if necessary.
766             writeFilter(false);
767 
768             // We used our writeSpin quantum, and should try to write again later.
769             executor().execute(flushTask);
770         } else {
771             // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
772             // when it can accept more data.
773             writeFilter(true);
774         }
775     }
776 
777     /**
778      * Attempt to write a single object.
779      * @param in the collection which contains objects to write.
780      * @return The value that should be decremented from the write quantum which starts at
781      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
782      * <ul>
783      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
784      *     is encountered</li>
785      *     <li>1 - if a single call to write data was made to the OS</li>
786      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
787      *     data was accepted</li>
788      * </ul>
789      * @throws Exception If an I/O error occurs.
790      */
791     private int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
792         // The outbound buffer contains only one message or it contains a file region.
793         Object msg = in.current();
794         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
795             if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
796                 // File descriptor was written, so remove it.
797                 in.remove();
798                 return 1;
799             }
800         }
801 
802         if (msg instanceof Buffer) {
803             return writeBytes(in, (Buffer) msg);
804         } else if (msg instanceof DefaultFileRegion) {
805             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
806         } else if (msg instanceof FileRegion) {
807             return writeFileRegion(in, (FileRegion) msg);
808         } else {
809             // Should never reach here.
810             throw new Error();
811         }
812     }
813 
814     /**
815      * Attempt to write multiple {@link Buffer} objects.
816      * @param in the collection which contains objects to write.
817      * @return The value that should be decremented from the write quantum which starts at
818      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
819      * <ul>
820      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
821      *     is encountered</li>
822      *     <li>1 - if a single call to write data was made to the OS</li>
823      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
824      *     data was accepted</li>
825      * </ul>
826      * @throws Exception If an I/O error occurs.
827      */
828     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
829         final long maxBytesPerGatheringWrite = getMaxBytesPerGatheringWrite();
830         IovArray array = registration().cleanArray();
831         array.maxBytes(maxBytesPerGatheringWrite);
832         in.forEachFlushedMessage(array);
833 
834         if (array.count() >= 1) {
835             return writeBytesMultiple(in, array);
836         }
837         // cnt == 0, which means the outbound buffer contained empty buffers only.
838         in.removeBytes(0);
839         return 0;
840     }
841 
842     @Override
843     protected void doShutdown(ChannelShutdownDirection direction) throws Exception {
844         switch (direction) {
845             case Outbound:
846                 socket.shutdown(false, true);
847                 break;
848             case Inbound:
849                 try {
850                     socket.shutdown(true, false);
851                 } catch (NotYetConnectedException ignore) {
852                     // We attempted to shutdown and failed, which means the input has already effectively been
853                     // shutdown.
854                 }
855                 break;
856             default:
857                 throw new AssertionError();
858         }
859     }
860 
861     @Override
862     public boolean isShutdown(ChannelShutdownDirection direction) {
863         if (!isActive()) {
864             return true;
865         }
866         switch (direction) {
867             case Outbound:
868                 return socket.isOutputShutdown();
869             case Inbound:
870                 return socket.isInputShutdown();
871             default:
872                 throw new AssertionError();
873         }
874     }
875 
876     private void readReadyBytes(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
877                                 Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
878         final ChannelPipeline pipeline = pipeline();
879         allocHandle.reset();
880         Buffer buffer = null;
881         boolean close = false;
882         try {
883             do {
884                 // we use a direct buffer here as the native implementations only be able
885                 // to handle direct buffers.
886                 buffer = allocHandle.allocate(recvBufferAllocator);
887                 doReadBytes(buffer);
888                 if (allocHandle.lastBytesRead() <= 0) {
889                     // nothing was read, release the buffer.
890                     Resource.dispose(buffer);
891                     buffer = null;
892                     close = allocHandle.lastBytesRead() < 0;
893                     if (close) {
894                         // There is nothing left to read as we received an EOF.
895                         readPending = false;
896                     }
897                     break;
898                 }
899                 allocHandle.incMessagesRead(1);
900                 readPending = false;
901                 pipeline.fireChannelRead(buffer);
902                 buffer = null;
903 
904                 if (shouldBreakReadReady()) {
905                     // We need to do this for two reasons:
906                     //
907                     // - If the input was shutdown in between (which may be the case when the user did it in the
908                     //   fireChannelRead(...) method we should not try to read again to not produce any
909                     //   miss-leading exceptions.
910                     //
911                     // - If the user closes the channel we need to ensure we not try to read from it again as
912                     //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
913                     //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
914                     //   reading data from a filedescriptor that belongs to another socket then the socket that
915                     //   was "wrapped" by this Channel implementation.
916                     break;
917                 }
918             } while (allocHandle.continueReading(isAutoRead(), maybeMoreData)
919                     && !isShutdown(ChannelShutdownDirection.Inbound));
920 
921             allocHandle.readComplete();
922             pipeline.fireChannelReadComplete();
923 
924             if (close) {
925                 shutdownInput(false);
926             } else {
927                 readIfIsAutoRead();
928             }
929         } catch (Throwable t) {
930             handleReadException(pipeline, buffer, t, close, allocHandle);
931         }
932     }
933 
934     private void handleReadException(ChannelPipeline pipeline, Buffer buffer, Throwable cause, boolean close,
935                                      RecvBufferAllocator.Handle allocHandle) {
936         if (buffer.readableBytes() > 0) {
937             readPending = false;
938             pipeline.fireChannelRead(buffer);
939         } else {
940             buffer.close();
941         }
942         if (isConnectPending()) {
943             finishConnect();
944         } else {
945             allocHandle.readComplete();
946             pipeline.fireChannelReadComplete();
947             pipeline.fireChannelExceptionCaught(cause);
948 
949             // If oom will close the read event, release connection.
950             // See https://github.com/netty/netty/issues/10434
951             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
952                 shutdownInput(false);
953             } else {
954                 readIfIsAutoRead();
955             }
956         }
957     }
958 
959     private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
960         KQueueSocketWritableByteChannel() {
961             super(socket);
962         }
963 
964         @Override
965         protected BufferAllocator alloc() {
966             return bufferAllocator();
967         }
968     }
969 }