View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.socket.InternetProtocolFamily;
22  import io.netty.channel.socket.ServerSocketChannel;
23  import io.netty.channel.socket.SocketChannel;
24  import io.netty.channel.socket.SocketProtocolFamily;
25  import io.netty.channel.unix.IovArray;
26  import io.netty.util.concurrent.GlobalEventExecutor;
27  
28  import java.net.InetSocketAddress;
29  import java.net.SocketAddress;
30  import java.util.concurrent.Executor;
31  
32  public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
33      private final KQueueSocketChannelConfig config;
34  
35      public KQueueSocketChannel() {
36          super(null, BsdSocket.newSocketStream(), false);
37          config = new KQueueSocketChannelConfig(this);
38      }
39  
40      /**
41       * @deprecated use {@link KQueueDatagramChannel(SocketProtocolFamily)}
42       */
43      @Deprecated
44      public KQueueSocketChannel(InternetProtocolFamily protocol) {
45          super(null, BsdSocket.newSocketStream(protocol), false);
46          config = new KQueueSocketChannelConfig(this);
47      }
48  
49      public KQueueSocketChannel(SocketProtocolFamily protocol) {
50          super(null, BsdSocket.newSocketStream(protocol), false);
51          config = new KQueueSocketChannelConfig(this);
52      }
53  
54      public KQueueSocketChannel(int fd) {
55          super(new BsdSocket(fd));
56          config = new KQueueSocketChannelConfig(this);
57      }
58  
59      KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
60          super(parent, fd, remoteAddress);
61          config = new KQueueSocketChannelConfig(this);
62      }
63  
64      @Override
65      public InetSocketAddress remoteAddress() {
66          return (InetSocketAddress) super.remoteAddress();
67      }
68  
69      @Override
70      public InetSocketAddress localAddress() {
71          return (InetSocketAddress) super.localAddress();
72      }
73  
74      @Override
75      public KQueueSocketChannelConfig config() {
76          return config;
77      }
78  
79      @Override
80      public ServerSocketChannel parent() {
81          return (ServerSocketChannel) super.parent();
82      }
83  
84      @Override
85      protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
86          if (config.isTcpFastOpenConnect()) {
87              ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
88              outbound.addFlush();
89              Object curr;
90              if ((curr = outbound.current()) instanceof ByteBuf) {
91                  ByteBuf initialData = (ByteBuf) curr;
92                  // Don't bother with TCP FastOpen if we don't have any initial data to send anyway.
93                  if (initialData.isReadable()) {
94                      IovArray iov = new IovArray(config.getAllocator().directBuffer());
95                      try {
96                          iov.add(initialData, initialData.readerIndex(), initialData.readableBytes());
97                          int bytesSent = socket.connectx(
98                                  (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
99                          writeFilter(true);
100                         outbound.removeBytes(Math.abs(bytesSent));
101                         // The `connectx` method returns a negative number if connection is in-progress.
102                         // So we should return `true` to indicate that connection was established, if it's positive.
103                         return bytesSent > 0;
104                     } finally {
105                         iov.release();
106                     }
107                 }
108             }
109         }
110         return super.doConnect0(remoteAddress, localAddress);
111     }
112 
113     @Override
114     protected AbstractKQueueUnsafe newUnsafe() {
115         return new KQueueSocketChannelUnsafe();
116     }
117 
118     private final class KQueueSocketChannelUnsafe extends KQueueStreamUnsafe {
119         @Override
120         protected Executor prepareToClose() {
121             try {
122                 // Check isOpen() first as otherwise it will throw a RuntimeException
123                 // when call getSoLinger() as the fd is not valid anymore.
124                 if (isOpen() && config().getSoLinger() > 0) {
125                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
126                     // because we try to read or write until the actual close happens which may be later due
127                     // SO_LINGER handling.
128                     // See https://github.com/netty/netty/issues/4449
129                     doDeregister();
130                     return GlobalEventExecutor.INSTANCE;
131                 }
132             } catch (Throwable ignore) {
133                 // Ignore the error as the underlying channel may be closed in the meantime and so
134                 // getSoLinger() may produce an exception. In this case we just return null.
135                 // See https://github.com/netty/netty/issues/4449
136             }
137             return null;
138         }
139     }
140 }