View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.kqueue;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelException;
21  import io.netty5.channel.ChannelMetadata;
22  import io.netty5.channel.ChannelOption;
23  import io.netty5.channel.ChannelOutboundBuffer;
24  import io.netty5.channel.ChannelPipeline;
25  import io.netty5.channel.ChannelShutdownDirection;
26  import io.netty5.channel.EventLoop;
27  import io.netty5.channel.EventLoopGroup;
28  import io.netty5.channel.RecvBufferAllocator;
29  import io.netty5.channel.ServerChannelRecvBufferAllocator;
30  import io.netty5.channel.socket.DomainSocketAddress;
31  import io.netty5.channel.socket.ServerSocketChannel;
32  import io.netty5.channel.socket.SocketProtocolFamily;
33  import io.netty5.channel.unix.IntegerUnixChannelOption;
34  import io.netty5.channel.unix.RawUnixChannelOption;
35  import io.netty5.channel.unix.UnixChannel;
36  import io.netty5.util.NetUtil;
37  import io.netty5.util.internal.UnstableApi;
38  import io.netty5.util.internal.logging.InternalLogger;
39  import io.netty5.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.File;
42  import java.io.IOException;
43  import java.net.ProtocolFamily;
44  import java.net.SocketAddress;
45  import java.util.Set;
46  import java.util.function.Predicate;
47  
48  import static io.netty5.channel.ChannelOption.SO_BACKLOG;
49  import static io.netty5.channel.ChannelOption.SO_RCVBUF;
50  import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
51  import static io.netty5.channel.ChannelOption.TCP_FASTOPEN;
52  import static io.netty5.channel.kqueue.BsdSocket.newSocketStream;
53  import static io.netty5.channel.kqueue.KQueueChannelOption.SO_ACCEPTFILTER;
54  import static io.netty5.channel.unix.NativeInetAddress.address;
55  import static io.netty5.channel.unix.UnixChannelOption.SO_REUSEPORT;
56  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
57  
58  /**
59   * {@link ServerSocketChannel} implementation that uses KQueue.
60   *
61   * <h3>Available options</h3>
62   *
63   * In addition to the options provided by {@link ServerSocketChannel} and {@link UnixChannel},
64   * {@link KQueueServerSocketChannel} allows the following options in the option map:
65   *
66   * <table border="1" cellspacing="0" cellpadding="6">
67   * <tr>
68   * <th>{@link ChannelOption}</th>
69   * <th>{@code INET}</th>
70   * <th>{@code INET6}</th>
71   * <th>{@code UNIX}</th>
72   * </tr><tr>
73   * <td>{@link IntegerUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
74   * </tr><tr>
75   * <td>{@link RawUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
76   * </tr><tr>
77   * <td>{@link ChannelOption#TCP_FASTOPEN}</td><td>X</td><td>X</td><td>-</td>
78   * </tr><tr>
79   * <td>{@link KQueueChannelOption#SO_ACCEPTFILTER}</td><td>X</td><td>X</td><td>X</td>
80   * </tr><tr>
81   * <td>{@link io.netty5.channel.unix.UnixChannelOption#SO_REUSEPORT}</td><td>X</td><td>X</td><td>-</td>
82   * </tr>
83   * </table>
84   */
85  @UnstableApi
86  public final class KQueueServerSocketChannel extends
87          AbstractKQueueChannel<UnixChannel> implements ServerSocketChannel {
88      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
89              KQueueServerSocketChannel.class);
90  
91      private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
92  
93      private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN = supportedOptionsDomainSocket();
94  
95      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
96      private final EventLoopGroup childEventLoopGroup;
97  
98      // Will hold the remote address after accept(...) was successful.
99      // We need 24 bytes for the address as maximum + 1 byte for storing the capacity.
100     // So use 26 bytes as it's a power of two.
101     private final byte[] acceptedAddress = new byte[26];
102 
103     private volatile int backlog = NetUtil.SOMAXCONN;
104     private volatile boolean enableTcpFastOpen;
105 
106     public KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
107         super(null, eventLoop, METADATA, new ServerChannelRecvBufferAllocator(),
108                 newSocketStream(), false);
109         this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
110                 KQueueSocketChannel.class);
111     }
112 
113     public KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup,
114                                      SocketProtocolFamily protocolFamily) {
115         super(null, eventLoop, METADATA, new ServerChannelRecvBufferAllocator(),
116                 BsdSocket.newSocket(protocolFamily), false);
117         this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
118                 KQueueSocketChannel.class);
119     }
120 
121     public KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup,
122                                      int fd, ProtocolFamily protocolFamily) {
123         // Must call this constructor to ensure this object's local address is configured correctly.
124         // The local address can only be obtained from a Socket object.
125         this(eventLoop, childEventLoopGroup, new BsdSocket(fd, SocketProtocolFamily.of(protocolFamily)));
126     }
127 
128     private KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup, BsdSocket socket) {
129         // Must call this constructor to ensure this object's local address is configured correctly.
130         // The local address can only be obtained from a Socket object.
131         super(null, eventLoop, METADATA, new ServerChannelRecvBufferAllocator(), socket, isSoErrorZero(socket));
132         this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
133                 KQueueSocketChannel.class);
134     }
135 
136     @Override
137     protected void doBind(SocketAddress localAddress) throws Exception {
138         super.doBind(localAddress);
139         socket.listen(getBacklog());
140         if (socket.protocolFamily() != SocketProtocolFamily.UNIX && isTcpFastOpen()) {
141             socket.setTcpFastOpen(true);
142         }
143         active = true;
144     }
145 
146     @SuppressWarnings("unchecked")
147     @Override
148     protected <T> T getExtendedOption(ChannelOption<T> option) {
149         if (isSupported(socket.protocolFamily(), option)) {
150             if (option == SO_RCVBUF) {
151                 return (T) Integer.valueOf(getReceiveBufferSize());
152             }
153             if (option == SO_REUSEADDR) {
154                 return (T) Boolean.valueOf(isReuseAddress());
155             }
156             if (option == SO_BACKLOG) {
157                 return (T) Integer.valueOf(getBacklog());
158             }
159             if (option == TCP_FASTOPEN) {
160                 return (T) (isTcpFastOpen() ? Integer.valueOf(1) : Integer.valueOf(0));
161             }
162             if (option == SO_REUSEPORT) {
163                 return (T) Boolean.valueOf(isReusePort());
164             }
165             if (option == SO_ACCEPTFILTER) {
166                 return (T) getAcceptFilter();
167             }
168         }
169         return super.getExtendedOption(option);
170     }
171 
172     @Override
173     protected  <T> void setExtendedOption(ChannelOption<T> option, T value) {
174         if (isSupported(socket.protocolFamily(), option)) {
175             if (option == SO_RCVBUF) {
176                 setReceiveBufferSize((Integer) value);
177             } else if (option == SO_REUSEADDR) {
178                 setReuseAddress((Boolean) value);
179             } else if (option == SO_BACKLOG) {
180                 setBacklog((Integer) value);
181             } else if (option == TCP_FASTOPEN) {
182                 setTcpFastOpen((Integer) value > 0);
183             } else if (option == SO_REUSEPORT) {
184                 setReusePort((Boolean) value);
185             } else if (option == SO_ACCEPTFILTER) {
186                 setAcceptFilter((AcceptFilter) value);
187             }
188         } else {
189             super.setExtendedOption(option, value);
190         }
191     }
192 
193     private boolean isSupported(SocketProtocolFamily protocolFamily, ChannelOption<?> option) {
194         if (protocolFamily == SocketProtocolFamily.UNIX) {
195             return SUPPORTED_OPTIONS_DOMAIN.contains(option);
196         }
197         return SUPPORTED_OPTIONS.contains(option);
198     }
199 
200     @Override
201     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
202         return isSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
203     }
204 
205     private static Set<ChannelOption<?>> supportedOptions() {
206         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG,
207                 TCP_FASTOPEN, SO_REUSEPORT, SO_ACCEPTFILTER);
208     }
209 
210     private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
211         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, SO_ACCEPTFILTER);
212     }
213 
214     private boolean isReuseAddress() {
215         try {
216             return socket.isReuseAddress();
217         } catch (IOException e) {
218             throw new ChannelException(e);
219         }
220     }
221 
222     private void setReuseAddress(boolean reuseAddress) {
223         try {
224             socket.setReuseAddress(reuseAddress);
225         } catch (IOException e) {
226             throw new ChannelException(e);
227         }
228     }
229 
230     private int getReceiveBufferSize() {
231         try {
232             return socket.getReceiveBufferSize();
233         } catch (IOException e) {
234             throw new ChannelException(e);
235         }
236     }
237 
238     private void setReceiveBufferSize(int receiveBufferSize) {
239         try {
240             socket.setReceiveBufferSize(receiveBufferSize);
241         } catch (IOException e) {
242             throw new ChannelException(e);
243         }
244     }
245 
246     private int getBacklog() {
247         return backlog;
248     }
249 
250     private void setBacklog(int backlog) {
251         checkPositiveOrZero(backlog, "backlog");
252         this.backlog = backlog;
253     }
254 
255     /**
256      * Returns {@code true} if TCP FastOpen is enabled.
257      *
258      * @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
259      */
260     private boolean isTcpFastOpen() {
261         return enableTcpFastOpen;
262     }
263 
264     /**
265      * Enables TCP FastOpen on the server channel. If the underlying os doesn't support TCP_FASTOPEN setting this has no
266      * effect. This has to be set before doing listen on the socket otherwise this takes no effect.
267      *
268      * @param enableTcpFastOpen {@code true} if TCP FastOpen should be enabled for incomming connections.
269      *
270      * @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
271      */
272     private void setTcpFastOpen(boolean enableTcpFastOpen) {
273         this.enableTcpFastOpen = enableTcpFastOpen;
274     }
275 
276     private void setReusePort(boolean reusePort) {
277         try {
278             socket.setReusePort(reusePort);
279         } catch (IOException e) {
280             throw new ChannelException(e);
281         }
282     }
283 
284     private boolean isReusePort() {
285         try {
286             return socket.isReusePort();
287         } catch (IOException e) {
288             throw new ChannelException(e);
289         }
290     }
291 
292     private void setAcceptFilter(AcceptFilter acceptFilter) {
293         try {
294             socket.setAcceptFilter(acceptFilter);
295         } catch (IOException e) {
296             throw new ChannelException(e);
297         }
298     }
299 
300     private AcceptFilter getAcceptFilter() {
301         try {
302             return socket.getAcceptFilter();
303         } catch (IOException e) {
304             throw new ChannelException(e);
305         }
306     }
307 
308     private Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
309         final SocketAddress remote;
310         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
311             remote = null;
312         } else {
313             remote = address(address, offset, len);
314         }
315         return new KQueueSocketChannel(this, childEventLoopGroup().next(),
316                 new BsdSocket(fd, socket.protocolFamily()), remote);
317     }
318 
319     @Override
320     protected void doClose() throws Exception {
321         SocketAddress local = localAddress();
322         try {
323             super.doClose();
324         } finally {
325             if (local != null && socket.protocolFamily() == SocketProtocolFamily.UNIX) {
326                 String path = ((DomainSocketAddress) local).path();
327                 // Delete the socket file if possible.
328                 File socketFile = new File(path);
329                 boolean success = socketFile.delete();
330                 if (!success && logger.isDebugEnabled()) {
331                     logger.debug("Failed to delete a domain socket file: {}", path);
332                 }
333             }
334         }
335     }
336 
337     @Override
338     public EventLoopGroup childEventLoopGroup() {
339         return childEventLoopGroup;
340     }
341 
342     @Override
343     protected SocketAddress remoteAddress0() {
344         return null;
345     }
346 
347     @Override
348     protected void doWrite(ChannelOutboundBuffer in) {
349         throw new UnsupportedOperationException();
350     }
351 
352     @Override
353     protected Object filterOutboundMessage(Object msg) {
354         throw new UnsupportedOperationException();
355     }
356 
357     @Override
358     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
359         throw new UnsupportedOperationException();
360     }
361 
362     @Override
363     protected void doShutdown(ChannelShutdownDirection direction) {
364         throw new UnsupportedOperationException();
365     }
366 
367     @Override
368     public boolean isShutdown(ChannelShutdownDirection direction) {
369         return !isActive();
370     }
371 
372     @Override
373     void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
374                    Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
375         allocHandle.attemptedBytesRead(1);
376         final ChannelPipeline pipeline = pipeline();
377         Throwable exception = null;
378         try {
379             do {
380                 int acceptFd = socket.accept(acceptedAddress);
381                 if (acceptFd == -1) {
382                     // this means everything was handled for now
383                     allocHandle.lastBytesRead(-1);
384                     break;
385                 }
386                 allocHandle.lastBytesRead(1);
387                 allocHandle.incMessagesRead(1);
388 
389                 readPending = false;
390                 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
391                         acceptedAddress[0]));
392             } while (allocHandle.continueReading(isAutoRead(), maybeMoreData) &&
393                     !isShutdown(ChannelShutdownDirection.Inbound));
394         } catch (Throwable t) {
395             exception = t;
396         }
397         allocHandle.readComplete();
398         pipeline.fireChannelReadComplete();
399 
400         if (exception != null) {
401             pipeline.fireChannelExceptionCaught(exception);
402         }
403         readIfIsAutoRead();
404     }
405 }