1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.kqueue;
17
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelException;
21 import io.netty5.channel.ChannelMetadata;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.ChannelOutboundBuffer;
24 import io.netty5.channel.ChannelPipeline;
25 import io.netty5.channel.ChannelShutdownDirection;
26 import io.netty5.channel.EventLoop;
27 import io.netty5.channel.EventLoopGroup;
28 import io.netty5.channel.RecvBufferAllocator;
29 import io.netty5.channel.ServerChannelRecvBufferAllocator;
30 import io.netty5.channel.socket.DomainSocketAddress;
31 import io.netty5.channel.socket.ServerSocketChannel;
32 import io.netty5.channel.socket.SocketProtocolFamily;
33 import io.netty5.channel.unix.IntegerUnixChannelOption;
34 import io.netty5.channel.unix.RawUnixChannelOption;
35 import io.netty5.channel.unix.UnixChannel;
36 import io.netty5.util.NetUtil;
37 import io.netty5.util.internal.UnstableApi;
38 import io.netty5.util.internal.logging.InternalLogger;
39 import io.netty5.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.File;
42 import java.io.IOException;
43 import java.net.ProtocolFamily;
44 import java.net.SocketAddress;
45 import java.util.Set;
46 import java.util.function.Predicate;
47
48 import static io.netty5.channel.ChannelOption.SO_BACKLOG;
49 import static io.netty5.channel.ChannelOption.SO_RCVBUF;
50 import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
51 import static io.netty5.channel.ChannelOption.TCP_FASTOPEN;
52 import static io.netty5.channel.kqueue.BsdSocket.newSocketStream;
53 import static io.netty5.channel.kqueue.KQueueChannelOption.SO_ACCEPTFILTER;
54 import static io.netty5.channel.unix.NativeInetAddress.address;
55 import static io.netty5.channel.unix.UnixChannelOption.SO_REUSEPORT;
56 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @UnstableApi
86 public final class KQueueServerSocketChannel extends
87 AbstractKQueueChannel<UnixChannel> implements ServerSocketChannel {
88 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
89 KQueueServerSocketChannel.class);
90
91 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
92
93 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN = supportedOptionsDomainSocket();
94
95 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
96 private final EventLoopGroup childEventLoopGroup;
97
98
99
100
101 private final byte[] acceptedAddress = new byte[26];
102
103 private volatile int backlog = NetUtil.SOMAXCONN;
104 private volatile boolean enableTcpFastOpen;
105
106 public KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
107 super(null, eventLoop, METADATA, new ServerChannelRecvBufferAllocator(),
108 newSocketStream(), false);
109 this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
110 KQueueSocketChannel.class);
111 }
112
113 public KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup,
114 SocketProtocolFamily protocolFamily) {
115 super(null, eventLoop, METADATA, new ServerChannelRecvBufferAllocator(),
116 BsdSocket.newSocket(protocolFamily), false);
117 this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
118 KQueueSocketChannel.class);
119 }
120
121 public KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup,
122 int fd, ProtocolFamily protocolFamily) {
123
124
125 this(eventLoop, childEventLoopGroup, new BsdSocket(fd, SocketProtocolFamily.of(protocolFamily)));
126 }
127
128 private KQueueServerSocketChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup, BsdSocket socket) {
129
130
131 super(null, eventLoop, METADATA, new ServerChannelRecvBufferAllocator(), socket, isSoErrorZero(socket));
132 this.childEventLoopGroup = validateEventLoopGroup(childEventLoopGroup, "childEventLoopGroup",
133 KQueueSocketChannel.class);
134 }
135
136 @Override
137 protected void doBind(SocketAddress localAddress) throws Exception {
138 super.doBind(localAddress);
139 socket.listen(getBacklog());
140 if (socket.protocolFamily() != SocketProtocolFamily.UNIX && isTcpFastOpen()) {
141 socket.setTcpFastOpen(true);
142 }
143 active = true;
144 }
145
146 @SuppressWarnings("unchecked")
147 @Override
148 protected <T> T getExtendedOption(ChannelOption<T> option) {
149 if (isSupported(socket.protocolFamily(), option)) {
150 if (option == SO_RCVBUF) {
151 return (T) Integer.valueOf(getReceiveBufferSize());
152 }
153 if (option == SO_REUSEADDR) {
154 return (T) Boolean.valueOf(isReuseAddress());
155 }
156 if (option == SO_BACKLOG) {
157 return (T) Integer.valueOf(getBacklog());
158 }
159 if (option == TCP_FASTOPEN) {
160 return (T) (isTcpFastOpen() ? Integer.valueOf(1) : Integer.valueOf(0));
161 }
162 if (option == SO_REUSEPORT) {
163 return (T) Boolean.valueOf(isReusePort());
164 }
165 if (option == SO_ACCEPTFILTER) {
166 return (T) getAcceptFilter();
167 }
168 }
169 return super.getExtendedOption(option);
170 }
171
172 @Override
173 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
174 if (isSupported(socket.protocolFamily(), option)) {
175 if (option == SO_RCVBUF) {
176 setReceiveBufferSize((Integer) value);
177 } else if (option == SO_REUSEADDR) {
178 setReuseAddress((Boolean) value);
179 } else if (option == SO_BACKLOG) {
180 setBacklog((Integer) value);
181 } else if (option == TCP_FASTOPEN) {
182 setTcpFastOpen((Integer) value > 0);
183 } else if (option == SO_REUSEPORT) {
184 setReusePort((Boolean) value);
185 } else if (option == SO_ACCEPTFILTER) {
186 setAcceptFilter((AcceptFilter) value);
187 }
188 } else {
189 super.setExtendedOption(option, value);
190 }
191 }
192
193 private boolean isSupported(SocketProtocolFamily protocolFamily, ChannelOption<?> option) {
194 if (protocolFamily == SocketProtocolFamily.UNIX) {
195 return SUPPORTED_OPTIONS_DOMAIN.contains(option);
196 }
197 return SUPPORTED_OPTIONS.contains(option);
198 }
199
200 @Override
201 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
202 return isSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
203 }
204
205 private static Set<ChannelOption<?>> supportedOptions() {
206 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG,
207 TCP_FASTOPEN, SO_REUSEPORT, SO_ACCEPTFILTER);
208 }
209
210 private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
211 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, SO_ACCEPTFILTER);
212 }
213
214 private boolean isReuseAddress() {
215 try {
216 return socket.isReuseAddress();
217 } catch (IOException e) {
218 throw new ChannelException(e);
219 }
220 }
221
222 private void setReuseAddress(boolean reuseAddress) {
223 try {
224 socket.setReuseAddress(reuseAddress);
225 } catch (IOException e) {
226 throw new ChannelException(e);
227 }
228 }
229
230 private int getReceiveBufferSize() {
231 try {
232 return socket.getReceiveBufferSize();
233 } catch (IOException e) {
234 throw new ChannelException(e);
235 }
236 }
237
238 private void setReceiveBufferSize(int receiveBufferSize) {
239 try {
240 socket.setReceiveBufferSize(receiveBufferSize);
241 } catch (IOException e) {
242 throw new ChannelException(e);
243 }
244 }
245
246 private int getBacklog() {
247 return backlog;
248 }
249
250 private void setBacklog(int backlog) {
251 checkPositiveOrZero(backlog, "backlog");
252 this.backlog = backlog;
253 }
254
255
256
257
258
259
260 private boolean isTcpFastOpen() {
261 return enableTcpFastOpen;
262 }
263
264
265
266
267
268
269
270
271
272 private void setTcpFastOpen(boolean enableTcpFastOpen) {
273 this.enableTcpFastOpen = enableTcpFastOpen;
274 }
275
276 private void setReusePort(boolean reusePort) {
277 try {
278 socket.setReusePort(reusePort);
279 } catch (IOException e) {
280 throw new ChannelException(e);
281 }
282 }
283
284 private boolean isReusePort() {
285 try {
286 return socket.isReusePort();
287 } catch (IOException e) {
288 throw new ChannelException(e);
289 }
290 }
291
292 private void setAcceptFilter(AcceptFilter acceptFilter) {
293 try {
294 socket.setAcceptFilter(acceptFilter);
295 } catch (IOException e) {
296 throw new ChannelException(e);
297 }
298 }
299
300 private AcceptFilter getAcceptFilter() {
301 try {
302 return socket.getAcceptFilter();
303 } catch (IOException e) {
304 throw new ChannelException(e);
305 }
306 }
307
308 private Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
309 final SocketAddress remote;
310 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
311 remote = null;
312 } else {
313 remote = address(address, offset, len);
314 }
315 return new KQueueSocketChannel(this, childEventLoopGroup().next(),
316 new BsdSocket(fd, socket.protocolFamily()), remote);
317 }
318
319 @Override
320 protected void doClose() throws Exception {
321 SocketAddress local = localAddress();
322 try {
323 super.doClose();
324 } finally {
325 if (local != null && socket.protocolFamily() == SocketProtocolFamily.UNIX) {
326 String path = ((DomainSocketAddress) local).path();
327
328 File socketFile = new File(path);
329 boolean success = socketFile.delete();
330 if (!success && logger.isDebugEnabled()) {
331 logger.debug("Failed to delete a domain socket file: {}", path);
332 }
333 }
334 }
335 }
336
337 @Override
338 public EventLoopGroup childEventLoopGroup() {
339 return childEventLoopGroup;
340 }
341
342 @Override
343 protected SocketAddress remoteAddress0() {
344 return null;
345 }
346
347 @Override
348 protected void doWrite(ChannelOutboundBuffer in) {
349 throw new UnsupportedOperationException();
350 }
351
352 @Override
353 protected Object filterOutboundMessage(Object msg) {
354 throw new UnsupportedOperationException();
355 }
356
357 @Override
358 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
359 throw new UnsupportedOperationException();
360 }
361
362 @Override
363 protected void doShutdown(ChannelShutdownDirection direction) {
364 throw new UnsupportedOperationException();
365 }
366
367 @Override
368 public boolean isShutdown(ChannelShutdownDirection direction) {
369 return !isActive();
370 }
371
372 @Override
373 void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
374 Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
375 allocHandle.attemptedBytesRead(1);
376 final ChannelPipeline pipeline = pipeline();
377 Throwable exception = null;
378 try {
379 do {
380 int acceptFd = socket.accept(acceptedAddress);
381 if (acceptFd == -1) {
382
383 allocHandle.lastBytesRead(-1);
384 break;
385 }
386 allocHandle.lastBytesRead(1);
387 allocHandle.incMessagesRead(1);
388
389 readPending = false;
390 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
391 acceptedAddress[0]));
392 } while (allocHandle.continueReading(isAutoRead(), maybeMoreData) &&
393 !isShutdown(ChannelShutdownDirection.Inbound));
394 } catch (Throwable t) {
395 exception = t;
396 }
397 allocHandle.readComplete();
398 pipeline.fireChannelReadComplete();
399
400 if (exception != null) {
401 pipeline.fireChannelExceptionCaught(exception);
402 }
403 readIfIsAutoRead();
404 }
405 }