View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.epoll;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelException;
21  import io.netty5.channel.ChannelMetadata;
22  import io.netty5.channel.ChannelOption;
23  import io.netty5.channel.ChannelOutboundBuffer;
24  import io.netty5.channel.ChannelPipeline;
25  import io.netty5.channel.ChannelShutdownDirection;
26  import io.netty5.channel.EventLoop;
27  import io.netty5.channel.EventLoopGroup;
28  import io.netty5.channel.RecvBufferAllocator;
29  import io.netty5.channel.ServerChannelRecvBufferAllocator;
30  import io.netty5.channel.socket.DomainSocketAddress;
31  import io.netty5.channel.socket.ServerSocketChannel;
32  import io.netty5.channel.socket.SocketProtocolFamily;
33  import io.netty5.channel.unix.IntegerUnixChannelOption;
34  import io.netty5.channel.unix.RawUnixChannelOption;
35  import io.netty5.channel.unix.UnixChannel;
36  import io.netty5.channel.unix.UnixChannelOption;
37  import io.netty5.util.NetUtil;
38  import io.netty5.util.internal.logging.InternalLogger;
39  import io.netty5.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.File;
42  import java.io.IOException;
43  import java.net.InetAddress;
44  import java.net.ProtocolFamily;
45  import java.net.SocketAddress;
46  import java.util.Collection;
47  import java.util.Collections;
48  import java.util.Map;
49  import java.util.Set;
50  import java.util.function.Predicate;
51  
52  import static io.netty5.channel.ChannelOption.SO_BACKLOG;
53  import static io.netty5.channel.ChannelOption.SO_RCVBUF;
54  import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
55  import static io.netty5.channel.ChannelOption.TCP_FASTOPEN;
56  import static io.netty5.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_SERVER;
57  import static io.netty5.channel.unix.NativeInetAddress.address;
58  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
59  
60  /**
61   * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
62   * maximal performance.
63   *
64   * <h3>Available options</h3>
65   *
66   * In addition to the options provided by {@link ServerSocketChannel} and {@link UnixChannel},
67   * {@link EpollServerSocketChannel} allows the following options in the option map:
68   *
69   * <table border="1" cellspacing="0" cellpadding="6">
70   * <tr>
71   * <th>{@link ChannelOption}</th>
72   * <th>{@code INET}</th>
73   * <th>{@code INET6}</th>
74   * <th>{@code UNIX}</th>
75   * </tr><tr>
76   * <td>{@link IntegerUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
77   * </tr><tr>
78   * <td>{@link RawUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
79   * </tr><tr>
80   * <td>{@link UnixChannelOption#SO_REUSEPORT}</td><td>X</td><td>X</td><td>-</td>
81   * </tr><tr>
82   * <td>{@link EpollChannelOption#IP_FREEBIND}</td><td>X</td><td>X</td><td>-</td>
83   * </tr><tr>
84   * <td>{@link EpollChannelOption#TCP_DEFER_ACCEPT}</td><td>X</td><td>X</td><td>-</td>
85   * </tr><tr>
86   * <td>{@link ChannelOption#TCP_FASTOPEN}</td><td>X</td><td>X</td><td>-</td>
87   * </tr>
88   * </table>
89   */
90  public final class EpollServerSocketChannel
91          extends AbstractEpollChannel<UnixChannel>
92          implements ServerSocketChannel {
93  
94      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
95              EpollServerSocketChannel.class);
96      private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
97      private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
98  
99      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
100 
101     private static Predicate<RecvBufferAllocator.Handle> MAYBE_MORE_DATA = h -> h.lastBytesRead() > 0;
102 
103     private final EventLoopGroup childEventLoopGroup;
104     // Will hold the remote address after accept(...) was successful.
105     // We need 24 bytes for the address as maximum + 1 byte for storing the length.
106     // So use 26 bytes as it's a power of two.
107     private final byte[] acceptedAddress = new byte[26];
108 
109     private volatile int backlog = NetUtil.SOMAXCONN;
110     private volatile int pendingFastOpenRequestsThreshold;
111 
112     private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
113 
114     public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
115         super(eventLoop, METADATA, 0, new ServerChannelRecvBufferAllocator(), LinuxSocket.newSocketStream());
116         this.childEventLoopGroup = validateEventLoopGroup(
117                 childEventLoopGroup, "childEventLoopGroup", EpollSocketChannel.class);
118     }
119 
120     public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup,
121                                     ProtocolFamily protocolFamily) {
122         super(null, eventLoop, METADATA, 0, new ServerChannelRecvBufferAllocator(),
123                 LinuxSocket.newSocket(protocolFamily), false);
124         this.childEventLoopGroup = validateEventLoopGroup(
125                 childEventLoopGroup, "childEventLoopGroup", EpollSocketChannel.class);
126     }
127 
128     public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup, int fd,
129                                     ProtocolFamily protocolFamily) {
130         this(eventLoop, childEventLoopGroup, new LinuxSocket(fd, SocketProtocolFamily.of(protocolFamily)));
131     }
132 
133     private EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup, LinuxSocket socket) {
134         // Must call this constructor to ensure this object's local address is configured correctly.
135         // The local address can only be obtained from a Socket object.
136         super(null, eventLoop, METADATA, 0, new ServerChannelRecvBufferAllocator(), socket, isSoErrorZero(socket));
137         this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
138                 EpollSocketChannel.class);
139     }
140 
141     @Override
142     public EventLoopGroup childEventLoopGroup() {
143         return childEventLoopGroup;
144     }
145 
146     @SuppressWarnings("unchecked")
147     @Override
148     protected <T> T getExtendedOption(ChannelOption<T> option) {
149         if (isOptionSupported(socket.protocolFamily(), option)) {
150             if (option == SO_RCVBUF) {
151                 return (T) Integer.valueOf(getReceiveBufferSize());
152             }
153             if (option == SO_REUSEADDR) {
154                 return (T) Boolean.valueOf(isReuseAddress());
155             }
156             if (option == SO_BACKLOG) {
157                 return (T) Integer.valueOf(getBacklog());
158             }
159             if (option == TCP_FASTOPEN) {
160                 return (T) Integer.valueOf(getTcpFastopen());
161             }
162             if (option == EpollChannelOption.IP_FREEBIND) {
163                 return (T) Boolean.valueOf(isIpFreebind());
164             }
165             if (option == EpollChannelOption.TCP_DEFER_ACCEPT) {
166                 return (T) Integer.valueOf(getTcpDeferAccept());
167             }
168             if (option == UnixChannelOption.SO_REUSEPORT) {
169                 return (T) Boolean.valueOf(isReusePort());
170             }
171             if (option == EpollChannelOption.TCP_MD5SIG) {
172                 return null;
173             }
174         }
175 
176         return super.getExtendedOption(option);
177     }
178 
179     @SuppressWarnings("unchecked")
180     @Override
181     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
182         if (isOptionSupported(socket.protocolFamily(), option)) {
183             if (option == SO_RCVBUF) {
184                 setReceiveBufferSize((Integer) value);
185             } else if (option == SO_REUSEADDR) {
186                 setReuseAddress((Boolean) value);
187             } else if (option == SO_BACKLOG) {
188                 setBacklog((Integer) value);
189             } else if (option == TCP_FASTOPEN) {
190                 setTcpFastopen((Integer) value);
191             } else if (option == EpollChannelOption.IP_FREEBIND) {
192                 setIpFreebind((Boolean) value);
193             } else if (option == EpollChannelOption.TCP_DEFER_ACCEPT) {
194                 setTcpDeferAccept((Integer) value);
195             } else if (option == UnixChannelOption.SO_REUSEPORT) {
196                 setReusePort((Boolean) value);
197             } else if (option == EpollChannelOption.TCP_MD5SIG) {
198                 setTcpMd5Sig((Map<InetAddress, byte[]>) value);
199             }
200         } else {
201             super.setExtendedOption(option, value);
202         }
203     }
204 
205     private static boolean isOptionSupported(SocketProtocolFamily family, ChannelOption<?> option) {
206         if (family == SocketProtocolFamily.UNIX) {
207             return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
208         }
209         return SUPPORTED_OPTIONS.contains(option);
210     }
211 
212     @Override
213     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
214         return isOptionSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
215     }
216 
217     private static Set<ChannelOption<?>> supportedOptions() {
218         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, TCP_FASTOPEN,
219                 EpollChannelOption.TCP_MD5SIG, EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_FREEBIND,
220                 EpollChannelOption.TCP_DEFER_ACCEPT);
221     }
222 
223     private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
224         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
225     }
226 
227     private boolean isReuseAddress() {
228         try {
229             return socket.isReuseAddress();
230         } catch (IOException e) {
231             throw new ChannelException(e);
232         }
233     }
234 
235     private void setReuseAddress(boolean reuseAddress) {
236         try {
237             socket.setReuseAddress(reuseAddress);
238         } catch (IOException e) {
239             throw new ChannelException(e);
240         }
241     }
242 
243     private void setReusePort(boolean reusePort) {
244         try {
245             socket.setReusePort(reusePort);
246         } catch (IOException e) {
247             throw new ChannelException(e);
248         }
249     }
250 
251     private boolean isReusePort() {
252         try {
253             return socket.isReusePort();
254         } catch (IOException e) {
255             throw new ChannelException(e);
256         }
257     }
258 
259     private void setIpFreebind(boolean reusePort) {
260         try {
261             socket.setIpFreeBind(reusePort);
262         } catch (IOException e) {
263             throw new ChannelException(e);
264         }
265     }
266 
267     private boolean isIpFreebind() {
268         try {
269             return socket.isIpFreeBind();
270         } catch (IOException e) {
271             throw new ChannelException(e);
272         }
273     }
274 
275     private int getReceiveBufferSize() {
276         try {
277             return socket.getReceiveBufferSize();
278         } catch (IOException e) {
279             throw new ChannelException(e);
280         }
281     }
282 
283     private void setReceiveBufferSize(int receiveBufferSize) {
284         try {
285             socket.setReceiveBufferSize(receiveBufferSize);
286         } catch (IOException e) {
287             throw new ChannelException(e);
288         }
289     }
290 
291     private int getBacklog() {
292         return backlog;
293     }
294 
295     private void setBacklog(int backlog) {
296         checkPositiveOrZero(backlog, "backlog");
297         this.backlog = backlog;
298     }
299 
300     private int getTcpDeferAccept() {
301         try {
302             return socket.getTcpDeferAccept();
303         } catch (IOException e) {
304             throw new ChannelException(e);
305         }
306     }
307 
308     private void setTcpDeferAccept(int deferAccept) {
309         try {
310             socket.setTcpDeferAccept(deferAccept);
311         } catch (IOException e) {
312             throw new ChannelException(e);
313         }
314     }
315 
316     /**
317      * Returns threshold value of number of pending for fast open connect.
318      *
319      * @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
320      */
321     private int getTcpFastopen() {
322         return pendingFastOpenRequestsThreshold;
323     }
324 
325     /**
326      * Enables tcpFastOpen on the server channel. If the underlying os doesn't support TCP_FASTOPEN setting this has no
327      * effect. This has to be set before doing listen on the socket otherwise this takes no effect.
328      *
329      * @param pendingFastOpenRequestsThreshold number of requests to be pending for fastopen at a given point in time
330      * for security.
331      *
332      * @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
333      */
334     private void setTcpFastopen(int pendingFastOpenRequestsThreshold) {
335         checkPositiveOrZero(pendingFastOpenRequestsThreshold, "pendingFastOpenRequestsThreshold");
336         this.pendingFastOpenRequestsThreshold = pendingFastOpenRequestsThreshold;
337     }
338 
339     @Override
340     protected void doBind(SocketAddress localAddress) throws Exception {
341         super.doBind(localAddress);
342         final int tcpFastopen;
343         if (socket.protocolFamily() != SocketProtocolFamily.UNIX &&
344                 IS_SUPPORTING_TCP_FASTOPEN_SERVER && (tcpFastopen = getTcpFastopen()) > 0) {
345             socket.setTcpFastOpen(tcpFastopen);
346         }
347         socket.listen(getBacklog());
348         active = true;
349     }
350 
351     @Override
352     protected void doWrite(ChannelOutboundBuffer in) {
353         throw new UnsupportedOperationException();
354     }
355 
356     @Override
357     protected Object filterOutboundMessage(Object msg) {
358         throw new UnsupportedOperationException();
359     }
360 
361     @Override
362     protected void epollInReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
363                                 boolean receivedRdHup) {
364         final ChannelPipeline pipeline = pipeline();
365         allocHandle.attemptedBytesRead(1);
366         Throwable exception = null;
367         try {
368             do {
369                 // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
370                 // RecvBufferAllocator.Handle knows if it should try to read again or not when autoRead is
371                 // enabled.
372                 allocHandle.lastBytesRead(socket.accept(acceptedAddress));
373                 if (allocHandle.lastBytesRead() == -1) {
374                     // this means everything was handled for now
375                     break;
376                 }
377                 allocHandle.incMessagesRead(1);
378 
379                 readPending = false;
380                 pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1,
381                         acceptedAddress[0]));
382             } while (allocHandle.continueReading(isAutoRead(), MAYBE_MORE_DATA)
383                     && !isShutdown(ChannelShutdownDirection.Inbound));
384         } catch (Throwable t) {
385             exception = t;
386         }
387         allocHandle.readComplete();
388         pipeline.fireChannelReadComplete();
389 
390         if (exception != null) {
391             pipeline.fireChannelExceptionCaught(exception);
392         }
393         readIfIsAutoRead();
394     }
395 
396     @Override
397     protected boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle) {
398         return handle.lastBytesRead() > 0;
399     }
400 
401     @Override
402     protected void doShutdown(ChannelShutdownDirection direction) {
403         throw new UnsupportedOperationException();
404     }
405 
406     @Override
407     public boolean isShutdown(ChannelShutdownDirection direction) {
408         return !isActive();
409     }
410 
411     @Override
412     protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) {
413         // Connect not supported by ServerChannel implementations
414         throw new UnsupportedOperationException();
415     }
416 
417     @Override
418     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
419         throw new UnsupportedOperationException();
420     }
421 
422     private Channel newChildChannel(int fd, byte[] address, int offset, int len) {
423         final SocketAddress remote;
424         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
425             remote = null;
426         } else {
427             remote = address(address, offset, len);
428         }
429         return new EpollSocketChannel(this, childEventLoopGroup().next(),
430                 new LinuxSocket(fd, socket.protocolFamily()), remote);
431     }
432 
433     @Override
434     protected void doClose() throws Exception {
435         try {
436             super.doClose();
437         } finally {
438             if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
439                 DomainSocketAddress local = (DomainSocketAddress) localAddress();
440                 if (local != null) {
441                     // Delete the socket file if possible.
442                     File socketFile = new File(local.path());
443                     boolean success = socketFile.delete();
444                     if (!success && logger.isDebugEnabled()) {
445                         logger.debug("Failed to delete a domain socket file: {}", local.path());
446                     }
447                 }
448             }
449         }
450     }
451 
452     Collection<InetAddress> tcpMd5SigAddresses() {
453         return tcpMd5SigAddresses;
454     }
455 
456     private void setTcpMd5Sig(Map<InetAddress, byte[]> keys) {
457         // Add synchronized as newTcpMp5Sigs might do multiple operations on the socket itself.
458         synchronized (this) {
459             try {
460                 tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
461             } catch (IOException e) {
462                 throw new ChannelException(e);
463             }
464         }
465     }
466 }