View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.socket.InternetProtocolFamily;
22  import io.netty.channel.socket.ServerSocketChannel;
23  import io.netty.channel.socket.SocketChannel;
24  import io.netty.channel.unix.IovArray;
25  import io.netty.util.concurrent.GlobalEventExecutor;
26  
27  import java.net.InetSocketAddress;
28  import java.net.SocketAddress;
29  import java.util.concurrent.Executor;
30  
31  public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
32      private final KQueueSocketChannelConfig config;
33  
34      public KQueueSocketChannel() {
35          super(null, BsdSocket.newSocketStream(), false);
36          config = new KQueueSocketChannelConfig(this);
37      }
38  
39      public KQueueSocketChannel(InternetProtocolFamily protocol) {
40          super(null, BsdSocket.newSocketStream(protocol), false);
41          config = new KQueueSocketChannelConfig(this);
42      }
43  
44      public KQueueSocketChannel(int fd) {
45          super(new BsdSocket(fd));
46          config = new KQueueSocketChannelConfig(this);
47      }
48  
49      KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
50          super(parent, fd, remoteAddress);
51          config = new KQueueSocketChannelConfig(this);
52      }
53  
54      @Override
55      public InetSocketAddress remoteAddress() {
56          return (InetSocketAddress) super.remoteAddress();
57      }
58  
59      @Override
60      public InetSocketAddress localAddress() {
61          return (InetSocketAddress) super.localAddress();
62      }
63  
64      @Override
65      public KQueueSocketChannelConfig config() {
66          return config;
67      }
68  
69      @Override
70      public ServerSocketChannel parent() {
71          return (ServerSocketChannel) super.parent();
72      }
73  
74      @Override
75      protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
76          if (config.isTcpFastOpenConnect()) {
77              ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
78              outbound.addFlush();
79              Object curr;
80              if ((curr = outbound.current()) instanceof ByteBuf) {
81                  ByteBuf initialData = (ByteBuf) curr;
82                  // Don't bother with TCP FastOpen if we don't have any initial data to send anyway.
83                  if (initialData.isReadable()) {
84                      IovArray iov = new IovArray(config.getAllocator().directBuffer());
85                      try {
86                          iov.add(initialData, initialData.readerIndex(), initialData.readableBytes());
87                          int bytesSent = socket.connectx(
88                                  (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
89                          writeFilter(true);
90                          outbound.removeBytes(Math.abs(bytesSent));
91                          // The `connectx` method returns a negative number if connection is in-progress.
92                          // So we should return `true` to indicate that connection was established, if it's positive.
93                          return bytesSent > 0;
94                      } finally {
95                          iov.release();
96                      }
97                  }
98              }
99          }
100         return super.doConnect0(remoteAddress, localAddress);
101     }
102 
103     @Override
104     protected AbstractKQueueUnsafe newUnsafe() {
105         return new KQueueSocketChannelUnsafe();
106     }
107 
108     private final class KQueueSocketChannelUnsafe extends KQueueStreamUnsafe {
109         @Override
110         protected Executor prepareToClose() {
111             try {
112                 // Check isOpen() first as otherwise it will throw a RuntimeException
113                 // when call getSoLinger() as the fd is not valid anymore.
114                 if (isOpen() && config().getSoLinger() > 0) {
115                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
116                     // because we try to read or write until the actual close happens which may be later due
117                     // SO_LINGER handling.
118                     // See https://github.com/netty/netty/issues/4449
119                     ((KQueueEventLoop) eventLoop()).remove(KQueueSocketChannel.this);
120                     return GlobalEventExecutor.INSTANCE;
121                 }
122             } catch (Throwable ignore) {
123                 // Ignore the error as the underlying channel may be closed in the meantime and so
124                 // getSoLinger() may produce an exception. In this case we just return null.
125                 // See https://github.com/netty/netty/issues/4449
126             }
127             return null;
128         }
129     }
130 }