View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.socket.InternetProtocolFamily;
22  import io.netty.channel.socket.ServerSocketChannel;
23  import io.netty.channel.socket.SocketChannel;
24  import io.netty.channel.unix.IovArray;
25  import io.netty.util.concurrent.GlobalEventExecutor;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.net.InetSocketAddress;
29  import java.net.SocketAddress;
30  import java.util.concurrent.Executor;
31  
32  @UnstableApi
33  public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
34      private final KQueueSocketChannelConfig config;
35  
36      public KQueueSocketChannel() {
37          super(null, BsdSocket.newSocketStream(), false);
38          config = new KQueueSocketChannelConfig(this);
39      }
40  
41      public KQueueSocketChannel(InternetProtocolFamily protocol) {
42          super(null, BsdSocket.newSocketStream(protocol), false);
43          config = new KQueueSocketChannelConfig(this);
44      }
45  
46      public KQueueSocketChannel(int fd) {
47          super(new BsdSocket(fd));
48          config = new KQueueSocketChannelConfig(this);
49      }
50  
51      KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
52          super(parent, fd, remoteAddress);
53          config = new KQueueSocketChannelConfig(this);
54      }
55  
56      @Override
57      public InetSocketAddress remoteAddress() {
58          return (InetSocketAddress) super.remoteAddress();
59      }
60  
61      @Override
62      public InetSocketAddress localAddress() {
63          return (InetSocketAddress) super.localAddress();
64      }
65  
66      @Override
67      public KQueueSocketChannelConfig config() {
68          return config;
69      }
70  
71      @Override
72      public ServerSocketChannel parent() {
73          return (ServerSocketChannel) super.parent();
74      }
75  
76      @Override
77      protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
78          if (config.isTcpFastOpenConnect()) {
79              ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
80              outbound.addFlush();
81              Object curr;
82              if ((curr = outbound.current()) instanceof ByteBuf) {
83                  ByteBuf initialData = (ByteBuf) curr;
84                  // Don't bother with TCP FastOpen if we don't have any initial data to send anyway.
85                  if (initialData.isReadable()) {
86                      IovArray iov = new IovArray(config.getAllocator().directBuffer());
87                      try {
88                          iov.add(initialData, initialData.readerIndex(), initialData.readableBytes());
89                          int bytesSent = socket.connectx(
90                                  (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
91                          writeFilter(true);
92                          outbound.removeBytes(Math.abs(bytesSent));
93                          // The `connectx` method returns a negative number if connection is in-progress.
94                          // So we should return `true` to indicate that connection was established, if it's positive.
95                          return bytesSent > 0;
96                      } finally {
97                          iov.release();
98                      }
99                  }
100             }
101         }
102         return super.doConnect0(remoteAddress, localAddress);
103     }
104 
105     @Override
106     protected AbstractKQueueUnsafe newUnsafe() {
107         return new KQueueSocketChannelUnsafe();
108     }
109 
110     private final class KQueueSocketChannelUnsafe extends KQueueStreamUnsafe {
111         @Override
112         protected Executor prepareToClose() {
113             try {
114                 // Check isOpen() first as otherwise it will throw a RuntimeException
115                 // when call getSoLinger() as the fd is not valid anymore.
116                 if (isOpen() && config().getSoLinger() > 0) {
117                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
118                     // because we try to read or write until the actual close happens which may be later due
119                     // SO_LINGER handling.
120                     // See https://github.com/netty/netty/issues/4449
121                     ((KQueueEventLoop) eventLoop()).remove(KQueueSocketChannel.this);
122                     return GlobalEventExecutor.INSTANCE;
123                 }
124             } catch (Throwable ignore) {
125                 // Ignore the error as the underlying channel may be closed in the meantime and so
126                 // getSoLinger() may produce an exception. In this case we just return null.
127                 // See https://github.com/netty/netty/issues/4449
128             }
129             return null;
130         }
131     }
132 }