1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.epoll;
17
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelException;
21 import io.netty5.channel.ChannelMetadata;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.ChannelOutboundBuffer;
24 import io.netty5.channel.ChannelPipeline;
25 import io.netty5.channel.ChannelShutdownDirection;
26 import io.netty5.channel.EventLoop;
27 import io.netty5.channel.EventLoopGroup;
28 import io.netty5.channel.RecvBufferAllocator;
29 import io.netty5.channel.ServerChannelRecvBufferAllocator;
30 import io.netty5.channel.socket.DomainSocketAddress;
31 import io.netty5.channel.socket.ServerSocketChannel;
32 import io.netty5.channel.socket.SocketProtocolFamily;
33 import io.netty5.channel.unix.IntegerUnixChannelOption;
34 import io.netty5.channel.unix.RawUnixChannelOption;
35 import io.netty5.channel.unix.UnixChannel;
36 import io.netty5.channel.unix.UnixChannelOption;
37 import io.netty5.util.NetUtil;
38 import io.netty5.util.internal.logging.InternalLogger;
39 import io.netty5.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.File;
42 import java.io.IOException;
43 import java.net.InetAddress;
44 import java.net.ProtocolFamily;
45 import java.net.SocketAddress;
46 import java.util.Collection;
47 import java.util.Collections;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.function.Predicate;
51
52 import static io.netty5.channel.ChannelOption.SO_BACKLOG;
53 import static io.netty5.channel.ChannelOption.SO_RCVBUF;
54 import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
55 import static io.netty5.channel.ChannelOption.TCP_FASTOPEN;
56 import static io.netty5.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_SERVER;
57 import static io.netty5.channel.unix.NativeInetAddress.address;
58 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 public final class EpollServerSocketChannel
91 extends AbstractEpollChannel<UnixChannel>
92 implements ServerSocketChannel {
93
94 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
95 EpollServerSocketChannel.class);
96 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
97 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
98
99 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
100
101 private static Predicate<RecvBufferAllocator.Handle> MAYBE_MORE_DATA = h -> h.lastBytesRead() > 0;
102
103 private final EventLoopGroup childEventLoopGroup;
104
105
106
107 private final byte[] acceptedAddress = new byte[26];
108
109 private volatile int backlog = NetUtil.SOMAXCONN;
110 private volatile int pendingFastOpenRequestsThreshold;
111
112 private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
113
114 public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
115 super(eventLoop, METADATA, 0, new ServerChannelRecvBufferAllocator(), LinuxSocket.newSocketStream());
116 this.childEventLoopGroup = validateEventLoopGroup(
117 childEventLoopGroup, "childEventLoopGroup", EpollSocketChannel.class);
118 }
119
120 public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup,
121 ProtocolFamily protocolFamily) {
122 super(null, eventLoop, METADATA, 0, new ServerChannelRecvBufferAllocator(),
123 LinuxSocket.newSocket(protocolFamily), false);
124 this.childEventLoopGroup = validateEventLoopGroup(
125 childEventLoopGroup, "childEventLoopGroup", EpollSocketChannel.class);
126 }
127
128 public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup, int fd,
129 ProtocolFamily protocolFamily) {
130 this(eventLoop, childEventLoopGroup, new LinuxSocket(fd, SocketProtocolFamily.of(protocolFamily)));
131 }
132
133 private EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup, LinuxSocket socket) {
134
135
136 super(null, eventLoop, METADATA, 0, new ServerChannelRecvBufferAllocator(), socket, isSoErrorZero(socket));
137 this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
138 EpollSocketChannel.class);
139 }
140
141 @Override
142 public EventLoopGroup childEventLoopGroup() {
143 return childEventLoopGroup;
144 }
145
146 @SuppressWarnings("unchecked")
147 @Override
148 protected <T> T getExtendedOption(ChannelOption<T> option) {
149 if (isOptionSupported(socket.protocolFamily(), option)) {
150 if (option == SO_RCVBUF) {
151 return (T) Integer.valueOf(getReceiveBufferSize());
152 }
153 if (option == SO_REUSEADDR) {
154 return (T) Boolean.valueOf(isReuseAddress());
155 }
156 if (option == SO_BACKLOG) {
157 return (T) Integer.valueOf(getBacklog());
158 }
159 if (option == TCP_FASTOPEN) {
160 return (T) Integer.valueOf(getTcpFastopen());
161 }
162 if (option == EpollChannelOption.IP_FREEBIND) {
163 return (T) Boolean.valueOf(isIpFreebind());
164 }
165 if (option == EpollChannelOption.TCP_DEFER_ACCEPT) {
166 return (T) Integer.valueOf(getTcpDeferAccept());
167 }
168 if (option == UnixChannelOption.SO_REUSEPORT) {
169 return (T) Boolean.valueOf(isReusePort());
170 }
171 if (option == EpollChannelOption.TCP_MD5SIG) {
172 return null;
173 }
174 }
175
176 return super.getExtendedOption(option);
177 }
178
179 @SuppressWarnings("unchecked")
180 @Override
181 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
182 if (isOptionSupported(socket.protocolFamily(), option)) {
183 if (option == SO_RCVBUF) {
184 setReceiveBufferSize((Integer) value);
185 } else if (option == SO_REUSEADDR) {
186 setReuseAddress((Boolean) value);
187 } else if (option == SO_BACKLOG) {
188 setBacklog((Integer) value);
189 } else if (option == TCP_FASTOPEN) {
190 setTcpFastopen((Integer) value);
191 } else if (option == EpollChannelOption.IP_FREEBIND) {
192 setIpFreebind((Boolean) value);
193 } else if (option == EpollChannelOption.TCP_DEFER_ACCEPT) {
194 setTcpDeferAccept((Integer) value);
195 } else if (option == UnixChannelOption.SO_REUSEPORT) {
196 setReusePort((Boolean) value);
197 } else if (option == EpollChannelOption.TCP_MD5SIG) {
198 setTcpMd5Sig((Map<InetAddress, byte[]>) value);
199 }
200 } else {
201 super.setExtendedOption(option, value);
202 }
203 }
204
205 private static boolean isOptionSupported(SocketProtocolFamily family, ChannelOption<?> option) {
206 if (family == SocketProtocolFamily.UNIX) {
207 return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
208 }
209 return SUPPORTED_OPTIONS.contains(option);
210 }
211
212 @Override
213 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
214 return isOptionSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
215 }
216
217 private static Set<ChannelOption<?>> supportedOptions() {
218 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, TCP_FASTOPEN,
219 EpollChannelOption.TCP_MD5SIG, EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_FREEBIND,
220 EpollChannelOption.TCP_DEFER_ACCEPT);
221 }
222
223 private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
224 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
225 }
226
227 private boolean isReuseAddress() {
228 try {
229 return socket.isReuseAddress();
230 } catch (IOException e) {
231 throw new ChannelException(e);
232 }
233 }
234
235 private void setReuseAddress(boolean reuseAddress) {
236 try {
237 socket.setReuseAddress(reuseAddress);
238 } catch (IOException e) {
239 throw new ChannelException(e);
240 }
241 }
242
243 private void setReusePort(boolean reusePort) {
244 try {
245 socket.setReusePort(reusePort);
246 } catch (IOException e) {
247 throw new ChannelException(e);
248 }
249 }
250
251 private boolean isReusePort() {
252 try {
253 return socket.isReusePort();
254 } catch (IOException e) {
255 throw new ChannelException(e);
256 }
257 }
258
259 private void setIpFreebind(boolean reusePort) {
260 try {
261 socket.setIpFreeBind(reusePort);
262 } catch (IOException e) {
263 throw new ChannelException(e);
264 }
265 }
266
267 private boolean isIpFreebind() {
268 try {
269 return socket.isIpFreeBind();
270 } catch (IOException e) {
271 throw new ChannelException(e);
272 }
273 }
274
275 private int getReceiveBufferSize() {
276 try {
277 return socket.getReceiveBufferSize();
278 } catch (IOException e) {
279 throw new ChannelException(e);
280 }
281 }
282
283 private void setReceiveBufferSize(int receiveBufferSize) {
284 try {
285 socket.setReceiveBufferSize(receiveBufferSize);
286 } catch (IOException e) {
287 throw new ChannelException(e);
288 }
289 }
290
291 private int getBacklog() {
292 return backlog;
293 }
294
295 private void setBacklog(int backlog) {
296 checkPositiveOrZero(backlog, "backlog");
297 this.backlog = backlog;
298 }
299
300 private int getTcpDeferAccept() {
301 try {
302 return socket.getTcpDeferAccept();
303 } catch (IOException e) {
304 throw new ChannelException(e);
305 }
306 }
307
308 private void setTcpDeferAccept(int deferAccept) {
309 try {
310 socket.setTcpDeferAccept(deferAccept);
311 } catch (IOException e) {
312 throw new ChannelException(e);
313 }
314 }
315
316
317
318
319
320
321 private int getTcpFastopen() {
322 return pendingFastOpenRequestsThreshold;
323 }
324
325
326
327
328
329
330
331
332
333
334 private void setTcpFastopen(int pendingFastOpenRequestsThreshold) {
335 checkPositiveOrZero(pendingFastOpenRequestsThreshold, "pendingFastOpenRequestsThreshold");
336 this.pendingFastOpenRequestsThreshold = pendingFastOpenRequestsThreshold;
337 }
338
339 @Override
340 protected void doBind(SocketAddress localAddress) throws Exception {
341 super.doBind(localAddress);
342 final int tcpFastopen;
343 if (socket.protocolFamily() != SocketProtocolFamily.UNIX &&
344 IS_SUPPORTING_TCP_FASTOPEN_SERVER && (tcpFastopen = getTcpFastopen()) > 0) {
345 socket.setTcpFastOpen(tcpFastopen);
346 }
347 socket.listen(getBacklog());
348 active = true;
349 }
350
351 @Override
352 protected void doWrite(ChannelOutboundBuffer in) {
353 throw new UnsupportedOperationException();
354 }
355
356 @Override
357 protected Object filterOutboundMessage(Object msg) {
358 throw new UnsupportedOperationException();
359 }
360
361 @Override
362 protected void epollInReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
363 boolean receivedRdHup) {
364 final ChannelPipeline pipeline = pipeline();
365 allocHandle.attemptedBytesRead(1);
366 Throwable exception = null;
367 try {
368 do {
369
370
371
372 allocHandle.lastBytesRead(socket.accept(acceptedAddress));
373 if (allocHandle.lastBytesRead() == -1) {
374
375 break;
376 }
377 allocHandle.incMessagesRead(1);
378
379 readPending = false;
380 pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1,
381 acceptedAddress[0]));
382 } while (allocHandle.continueReading(isAutoRead(), MAYBE_MORE_DATA)
383 && !isShutdown(ChannelShutdownDirection.Inbound));
384 } catch (Throwable t) {
385 exception = t;
386 }
387 allocHandle.readComplete();
388 pipeline.fireChannelReadComplete();
389
390 if (exception != null) {
391 pipeline.fireChannelExceptionCaught(exception);
392 }
393 readIfIsAutoRead();
394 }
395
396 @Override
397 protected boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle) {
398 return handle.lastBytesRead() > 0;
399 }
400
401 @Override
402 protected void doShutdown(ChannelShutdownDirection direction) {
403 throw new UnsupportedOperationException();
404 }
405
406 @Override
407 public boolean isShutdown(ChannelShutdownDirection direction) {
408 return !isActive();
409 }
410
411 @Override
412 protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) {
413
414 throw new UnsupportedOperationException();
415 }
416
417 @Override
418 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
419 throw new UnsupportedOperationException();
420 }
421
422 private Channel newChildChannel(int fd, byte[] address, int offset, int len) {
423 final SocketAddress remote;
424 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
425 remote = null;
426 } else {
427 remote = address(address, offset, len);
428 }
429 return new EpollSocketChannel(this, childEventLoopGroup().next(),
430 new LinuxSocket(fd, socket.protocolFamily()), remote);
431 }
432
433 @Override
434 protected void doClose() throws Exception {
435 try {
436 super.doClose();
437 } finally {
438 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
439 DomainSocketAddress local = (DomainSocketAddress) localAddress();
440 if (local != null) {
441
442 File socketFile = new File(local.path());
443 boolean success = socketFile.delete();
444 if (!success && logger.isDebugEnabled()) {
445 logger.debug("Failed to delete a domain socket file: {}", local.path());
446 }
447 }
448 }
449 }
450 }
451
452 Collection<InetAddress> tcpMd5SigAddresses() {
453 return tcpMd5SigAddresses;
454 }
455
456 private void setTcpMd5Sig(Map<InetAddress, byte[]> keys) {
457
458 synchronized (this) {
459 try {
460 tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
461 } catch (IOException e) {
462 throw new ChannelException(e);
463 }
464 }
465 }
466 }