1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.kqueue;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.AddressedEnvelope;
21 import io.netty5.channel.ChannelException;
22 import io.netty5.channel.ChannelMetadata;
23 import io.netty5.channel.ChannelOption;
24 import io.netty5.channel.ChannelOutboundBuffer;
25 import io.netty5.channel.ChannelPipeline;
26 import io.netty5.channel.ChannelShutdownDirection;
27 import io.netty5.channel.DefaultBufferAddressedEnvelope;
28 import io.netty5.channel.EventLoop;
29 import io.netty5.channel.FixedRecvBufferAllocator;
30 import io.netty5.channel.RecvBufferAllocator;
31 import io.netty5.channel.socket.DatagramPacket;
32 import io.netty5.channel.socket.DatagramChannel;
33 import io.netty5.channel.socket.DomainSocketAddress;
34 import io.netty5.channel.socket.SocketProtocolFamily;
35 import io.netty5.channel.unix.DatagramSocketAddress;
36 import io.netty5.channel.unix.DomainDatagramSocketAddress;
37 import io.netty5.channel.unix.Errors;
38 import io.netty5.channel.unix.IntegerUnixChannelOption;
39 import io.netty5.channel.unix.IovArray;
40 import io.netty5.channel.unix.RawUnixChannelOption;
41 import io.netty5.channel.unix.RecvFromAddressDomainSocket;
42 import io.netty5.channel.unix.UnixChannel;
43 import io.netty5.channel.unix.UnixChannelOption;
44 import io.netty5.channel.unix.UnixChannelUtil;
45 import io.netty5.util.concurrent.Future;
46 import io.netty5.util.internal.SilentDispose;
47 import io.netty5.util.internal.StringUtil;
48 import io.netty5.util.internal.UnstableApi;
49 import io.netty5.util.internal.logging.InternalLogger;
50 import io.netty5.util.internal.logging.InternalLoggerFactory;
51
52 import java.io.IOException;
53 import java.net.InetAddress;
54 import java.net.InetSocketAddress;
55 import java.net.NetworkInterface;
56 import java.net.PortUnreachableException;
57 import java.net.ProtocolFamily;
58 import java.net.SocketAddress;
59 import java.nio.ByteBuffer;
60 import java.util.Set;
61 import java.util.function.Predicate;
62
63 import static io.netty5.channel.ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION;
64 import static io.netty5.channel.ChannelOption.IP_TOS;
65 import static io.netty5.channel.ChannelOption.SO_BROADCAST;
66 import static io.netty5.channel.ChannelOption.SO_RCVBUF;
67 import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
68 import static io.netty5.channel.ChannelOption.SO_SNDBUF;
69 import static io.netty5.channel.unix.UnixChannelOption.SO_REUSEPORT;
70 import static io.netty5.util.CharsetUtil.UTF_8;
71
72 import static java.util.Objects.requireNonNull;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @UnstableApi
98 public final class KQueueDatagramChannel
99 extends AbstractKQueueChannel<UnixChannel> implements DatagramChannel {
100 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueDatagramChannel.class);
101 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
102
103 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
104
105 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
106
107 private static final String EXPECTED_TYPES =
108 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
109 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
110 StringUtil.simpleClassName(Buffer.class) + ", " +
111 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
112 StringUtil.simpleClassName(Buffer.class) + ')';
113
114 private static final String EXPECTED_TYPES_DOMAIN =
115 " (expected: " +
116 StringUtil.simpleClassName(DatagramPacket.class) + ", " +
117 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
118 StringUtil.simpleClassName(Buffer.class) + ", " +
119 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
120 StringUtil.simpleClassName(Buffer.class) + ')';
121
122 private static final Predicate<RecvBufferAllocator.Handle> TRUE_SUPPLIER = h -> true;
123
124 private volatile boolean connected;
125 private volatile boolean inputShutdown;
126 private volatile boolean outputShutdown;
127
128 private boolean activeOnOpen;
129
130 public KQueueDatagramChannel(EventLoop eventLoop) {
131 this(eventLoop, null);
132 }
133
134 public KQueueDatagramChannel(EventLoop eventLoop, ProtocolFamily protocolFamily) {
135 super(null, eventLoop, METADATA, new FixedRecvBufferAllocator(2048),
136 BsdSocket.newDatagramSocket(protocolFamily), false);
137 }
138
139 public KQueueDatagramChannel(EventLoop eventLoop, int fd, ProtocolFamily protocolFamily) {
140 this(eventLoop, new BsdSocket(fd, SocketProtocolFamily.of(protocolFamily)), true);
141 }
142
143 KQueueDatagramChannel(EventLoop eventLoop, BsdSocket socket, boolean active) {
144 super(null, eventLoop, METADATA, new FixedRecvBufferAllocator(2048), socket, active);
145 }
146
147 @SuppressWarnings("unchecked")
148 @Override
149 protected <T> T getExtendedOption(ChannelOption<T> option) {
150 if (isSupported(socket.protocolFamily(), option)) {
151 if (option == SO_BROADCAST) {
152 return (T) Boolean.valueOf(isBroadcast());
153 }
154 if (option == SO_RCVBUF) {
155 return (T) Integer.valueOf(getReceiveBufferSize());
156 }
157 if (option == SO_SNDBUF) {
158 return (T) Integer.valueOf(getSendBufferSize());
159 }
160 if (option == SO_REUSEADDR) {
161 return (T) Boolean.valueOf(isReuseAddress());
162 }
163 if (option == IP_TOS) {
164 return (T) Integer.valueOf(getTrafficClass());
165 }
166 if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
167 return (T) Boolean.valueOf(activeOnOpen);
168 }
169 if (option == SO_REUSEPORT) {
170 return (T) Boolean.valueOf(isReusePort());
171 }
172 }
173 return super.getExtendedOption(option);
174 }
175
176 @Override
177 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
178 if (isSupported(socket.protocolFamily(), option)) {
179 if (option == SO_BROADCAST) {
180 setBroadcast((Boolean) value);
181 } else if (option == SO_RCVBUF) {
182 setReceiveBufferSize((Integer) value);
183 } else if (option == SO_SNDBUF) {
184 setSendBufferSize((Integer) value);
185 } else if (option == SO_REUSEADDR) {
186 setReuseAddress((Boolean) value);
187 } else if (option == IP_TOS) {
188 setTrafficClass((Integer) value);
189 } else if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
190 setActiveOnOpen((Boolean) value);
191 } else if (option == SO_REUSEPORT) {
192 setReusePort((Boolean) value);
193 }
194 } else {
195 super.setExtendedOption(option, value);
196 }
197 }
198
199 private boolean isSupported(SocketProtocolFamily protocolFamily, ChannelOption<?> option) {
200 if (protocolFamily == SocketProtocolFamily.UNIX) {
201 return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
202 }
203 return SUPPORTED_OPTIONS.contains(option);
204 }
205
206 @Override
207 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
208 return isSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
209 }
210
211 private static Set<ChannelOption<?>> supportedOptions() {
212 return newSupportedIdentityOptionsSet(SO_BROADCAST, SO_RCVBUF, SO_SNDBUF, SO_REUSEADDR, IP_TOS,
213 DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, SO_REUSEPORT);
214 }
215
216 private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
217 return newSupportedIdentityOptionsSet(SO_SNDBUF, SO_RCVBUF, DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);
218 }
219
220 private void setActiveOnOpen(boolean activeOnOpen) {
221 if (isRegistered()) {
222 throw new IllegalStateException("Can only changed before channel was registered");
223 }
224 this.activeOnOpen = activeOnOpen;
225 }
226
227 private boolean getActiveOnOpen() {
228 return activeOnOpen;
229 }
230
231
232
233
234 private boolean isReusePort() {
235 try {
236 return socket.isReusePort();
237 } catch (IOException e) {
238 throw new ChannelException(e);
239 }
240 }
241
242
243
244
245
246
247
248
249 private void setReusePort(boolean reusePort) {
250 try {
251 socket.setReusePort(reusePort);
252 } catch (IOException e) {
253 throw new ChannelException(e);
254 }
255 }
256
257 private int getSendBufferSize() {
258 try {
259 return socket.getSendBufferSize();
260 } catch (IOException e) {
261 throw new ChannelException(e);
262 }
263 }
264
265 public void setSendBufferSize(int sendBufferSize) {
266 try {
267 socket.setSendBufferSize(sendBufferSize);
268 } catch (IOException e) {
269 throw new ChannelException(e);
270 }
271 }
272
273 private int getReceiveBufferSize() {
274 try {
275 return socket.getReceiveBufferSize();
276 } catch (IOException e) {
277 throw new ChannelException(e);
278 }
279 }
280
281 private void setReceiveBufferSize(int receiveBufferSize) {
282 try {
283 socket.setReceiveBufferSize(receiveBufferSize);
284 } catch (IOException e) {
285 throw new ChannelException(e);
286 }
287 }
288
289 private int getTrafficClass() {
290 try {
291 return socket.getTrafficClass();
292 } catch (IOException e) {
293 throw new ChannelException(e);
294 }
295 }
296
297 private void setTrafficClass(int trafficClass) {
298 try {
299 socket.setTrafficClass(trafficClass);
300 } catch (IOException e) {
301 throw new ChannelException(e);
302 }
303 }
304
305 private boolean isReuseAddress() {
306 try {
307 return socket.isReuseAddress();
308 } catch (IOException e) {
309 throw new ChannelException(e);
310 }
311 }
312
313 private void setReuseAddress(boolean reuseAddress) {
314 try {
315 socket.setReuseAddress(reuseAddress);
316 } catch (IOException e) {
317 throw new ChannelException(e);
318 }
319 }
320
321 private boolean isBroadcast() {
322 try {
323 return socket.isBroadcast();
324 } catch (IOException e) {
325 throw new ChannelException(e);
326 }
327 }
328
329 private void setBroadcast(boolean broadcast) {
330 try {
331 socket.setBroadcast(broadcast);
332 } catch (IOException e) {
333 throw new ChannelException(e);
334 }
335 }
336
337 @Override
338 public boolean isActive() {
339 return socket.isOpen() && (getActiveOnOpen() && isRegistered() || active);
340 }
341
342 @Override
343 public boolean isConnected() {
344 return connected;
345 }
346
347 @Override
348 protected void doBind(SocketAddress localAddress) throws Exception {
349 super.doBind(localAddress);
350 active = true;
351 }
352
353 private boolean doWriteMessage(Object msg) throws Exception {
354 final Object data;
355 final SocketAddress remoteAddress;
356 if (msg instanceof AddressedEnvelope) {
357 @SuppressWarnings("unchecked")
358 AddressedEnvelope<?, SocketAddress> envelope = (AddressedEnvelope<?, SocketAddress>) msg;
359 data = envelope.content();
360 remoteAddress = envelope.recipient();
361 } else {
362 data = msg;
363 remoteAddress = null;
364 }
365
366 return doWriteBufferMessage((Buffer) data, remoteAddress);
367 }
368
369 private boolean doWriteBufferMessage(Buffer data, SocketAddress remoteAddress) throws IOException {
370 final int initialReadableBytes = data.readableBytes();
371 if (initialReadableBytes == 0) {
372 return true;
373 }
374
375 if (data.countReadableComponents() > 1) {
376 IovArray array = registration().cleanArray();
377 data.forEachReadable(0, array);
378 int count = array.count();
379 assert count != 0;
380
381 final long writtenBytes;
382 if (remoteAddress == null) {
383 writtenBytes = socket.writevAddresses(array.memoryAddress(0), count);
384 } else {
385 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
386 writtenBytes = socket.sendToAddressesDomainSocket(
387 array.memoryAddress(0), count,
388 ((DomainSocketAddress) remoteAddress).path().getBytes(UTF_8));
389 } else {
390 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
391 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), count,
392 inetSocketAddress.getAddress(), inetSocketAddress.getPort());
393 }
394 }
395 return writtenBytes > 0;
396 } else {
397 if (remoteAddress == null) {
398 data.forEachReadable(0, (index, component) -> {
399 int written = socket.writeAddress(component.readableNativeAddress(), 0, component.readableBytes());
400 component.skipReadableBytes(written);
401 return false;
402 });
403 } else {
404 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
405 byte[] path = ((DomainSocketAddress) remoteAddress).path().getBytes(UTF_8);
406 data.forEachReadable(0, (index, component) -> {
407 int written = socket.sendToAddressDomainSocket(
408 component.readableNativeAddress(), 0, component.readableBytes(), path);
409 component.skipReadableBytes(written);
410 return false;
411 });
412 } else {
413 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
414 data.forEachReadable(0, (index, component) -> {
415 int written = socket.sendToAddress(component.readableNativeAddress(), 0,
416 component.readableBytes(), inetSocketAddress.getAddress(), inetSocketAddress.getPort());
417 component.skipReadableBytes(written);
418 return false;
419 });
420 }
421 }
422 return data.readableBytes() < initialReadableBytes;
423 }
424 }
425
426 @Override
427 protected Object filterOutboundMessage(Object msg) {
428 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
429 return filterOutboundMessage0(msg, DomainSocketAddress.class, EXPECTED_TYPES_DOMAIN);
430 } else {
431 return filterOutboundMessage0(msg, InetSocketAddress.class, EXPECTED_TYPES);
432 }
433 }
434
435 private Object filterOutboundMessage0(Object msg, Class<? extends SocketAddress> recipientClass,
436 String expectedTypes) {
437 if (msg instanceof DatagramPacket) {
438 DatagramPacket packet = (DatagramPacket) msg;
439 if (recipientClass.isInstance(packet.recipient())) {
440 Buffer content = packet.content();
441 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
442 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
443 }
444 } else if (msg instanceof Buffer) {
445 Buffer buf = (Buffer) msg;
446 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
447 } else if (msg instanceof AddressedEnvelope) {
448 @SuppressWarnings("unchecked")
449 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
450 SocketAddress recipient = e.recipient();
451 if (recipient == null || recipientClass.isInstance(recipient)) {
452 if (e.content() instanceof Buffer) {
453 Buffer buf = (Buffer) e.content();
454 if (UnixChannelUtil.isBufferCopyNeededForWrite(buf)) {
455 try {
456 return new DefaultBufferAddressedEnvelope<>(newDirectBuffer(null, buf), recipient);
457 } finally {
458 SilentDispose.dispose(e, logger);
459 }
460 }
461 return e;
462 }
463 }
464 }
465 throw new UnsupportedOperationException(
466 "unsupported message type: " + StringUtil.simpleClassName(msg) + expectedTypes);
467 }
468
469 @Override
470 protected void doDisconnect() throws Exception {
471 socket.disconnect();
472 connected = active = false;
473 resetCachedAddresses();
474 }
475
476 @Override
477 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
478 if (super.doConnect(remoteAddress, localAddress)) {
479 connected = true;
480 return true;
481 }
482 return false;
483 }
484
485 @Override
486 protected void doClose() throws Exception {
487 super.doClose();
488 connected = false;
489 }
490
491 @Override
492 void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
493 Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
494 final ChannelPipeline pipeline = pipeline();
495
496 Throwable exception = null;
497 Buffer buffer = null;
498 try {
499 boolean connected = isConnected();
500 do {
501 buffer = allocHandle.allocate(recvBufferAllocator);
502 allocHandle.attemptedBytesRead(buffer.writableBytes());
503
504 final DatagramPacket packet;
505 if (connected) {
506 try {
507 allocHandle.lastBytesRead(doReadBytes(buffer));
508 } catch (Errors.NativeIoException e) {
509
510 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
511 PortUnreachableException error = new PortUnreachableException(e.getMessage());
512 error.initCause(e);
513 throw error;
514 }
515 throw e;
516 }
517 if (allocHandle.lastBytesRead() <= 0) {
518
519 buffer.close();
520 buffer = null;
521 break;
522 }
523 packet = new DatagramPacket(buffer, localAddress(), remoteAddress());
524 } else {
525 SocketAddress localAddress = null;
526 SocketAddress remoteAddress = null;
527 int bytesRead = 0;
528 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
529 final RecvFromAddressDomainSocket recvFrom = new RecvFromAddressDomainSocket(socket);
530 buffer.forEachWritable(0, recvFrom);
531 DomainDatagramSocketAddress recvAddress = recvFrom.remoteAddress();
532 if (recvAddress != null) {
533 remoteAddress = recvAddress;
534 bytesRead = recvAddress.receivedAmount();
535 localAddress = recvAddress.localAddress();
536 }
537 } else {
538 try (var iterator = buffer.forEachWritable()) {
539 var component = iterator.first();
540 long addr = component.writableNativeAddress();
541 DatagramSocketAddress datagramSocketAddress;
542 if (addr != 0) {
543
544 datagramSocketAddress = socket.recvFromAddress(addr, 0, component.writableBytes());
545 } else {
546 ByteBuffer nioData = component.writableBuffer();
547 datagramSocketAddress = socket.recvFrom(
548 nioData, nioData.position(), nioData.limit());
549 }
550 if (datagramSocketAddress != null) {
551 remoteAddress = datagramSocketAddress;
552 localAddress = datagramSocketAddress.localAddress();
553 bytesRead = datagramSocketAddress.receivedAmount();
554 }
555 }
556 }
557
558 if (remoteAddress == null) {
559 allocHandle.lastBytesRead(-1);
560 buffer.close();
561 break;
562 }
563 if (localAddress == null) {
564 localAddress = localAddress();
565 }
566 allocHandle.lastBytesRead(bytesRead);
567 buffer.skipWritableBytes(allocHandle.lastBytesRead());
568
569 packet = new DatagramPacket(buffer, localAddress, remoteAddress);
570 }
571
572 allocHandle.incMessagesRead(1);
573
574 readPending = false;
575 pipeline.fireChannelRead(packet);
576
577 buffer = null;
578
579
580
581 } while (allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER));
582 } catch (Throwable t) {
583 if (buffer != null) {
584 buffer.close();
585 }
586 exception = t;
587 }
588
589 allocHandle.readComplete();
590 pipeline.fireChannelReadComplete();
591
592 if (exception != null) {
593 pipeline.fireChannelExceptionCaught(exception);
594 } else {
595 readIfIsAutoRead();
596 }
597 }
598
599 private <V> Future<V> newMulticastNotSupportedFuture() {
600 return newFailedFuture(new UnsupportedOperationException("Multicast not supported"));
601 }
602
603 @Override
604 public Future<Void> joinGroup(InetAddress multicastAddress) {
605 requireNonNull(multicastAddress, "multicast");
606 return newMulticastNotSupportedFuture();
607 }
608
609 @Override
610 public Future<Void> joinGroup(
611 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
612 requireNonNull(multicastAddress, "multicastAddress");
613 requireNonNull(networkInterface, "networkInterface");
614
615 return newMulticastNotSupportedFuture();
616 }
617
618 @Override
619 public Future<Void> leaveGroup(InetAddress multicastAddress) {
620 requireNonNull(multicastAddress, "multicast");
621 return newMulticastNotSupportedFuture();
622 }
623
624 @Override
625 public Future<Void> leaveGroup(
626 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
627 requireNonNull(multicastAddress, "multicastAddress");
628 requireNonNull(networkInterface, "networkInterface");
629
630 return newMulticastNotSupportedFuture();
631 }
632
633 @Override
634 public Future<Void> block(
635 InetAddress multicastAddress, NetworkInterface networkInterface,
636 InetAddress sourceToBlock) {
637 requireNonNull(multicastAddress, "multicastAddress");
638 requireNonNull(sourceToBlock, "sourceToBlock");
639 requireNonNull(networkInterface, "networkInterface");
640
641 return newMulticastNotSupportedFuture();
642 }
643
644 @Override
645 public Future<Void> block(InetAddress multicastAddress, InetAddress sourceToBlock) {
646 requireNonNull(multicastAddress, "multicastAddress");
647 requireNonNull(sourceToBlock, "sourceToBlock");
648
649 return newMulticastNotSupportedFuture();
650 }
651
652 @Override
653 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
654 int maxMessagesPerWrite = getMaxMessagesPerWrite();
655 while (maxMessagesPerWrite > 0) {
656 Object msg = in.current();
657 if (msg == null) {
658 break;
659 }
660
661 try {
662 boolean done = false;
663 for (int i = getWriteSpinCount(); i > 0; --i) {
664 if (doWriteMessage(msg)) {
665 done = true;
666 break;
667 }
668 }
669
670 if (done) {
671 in.remove();
672 maxMessagesPerWrite--;
673 } else {
674 break;
675 }
676 } catch (IOException e) {
677 maxMessagesPerWrite--;
678
679
680
681
682 in.remove(e);
683 }
684 }
685
686
687 writeFilter(!in.isEmpty());
688 }
689
690 @Override
691 protected void doShutdown(ChannelShutdownDirection direction) {
692 switch (direction) {
693 case Inbound:
694 inputShutdown = true;
695 break;
696 case Outbound:
697 outputShutdown = true;
698 break;
699 default:
700 throw new AssertionError();
701 }
702 }
703
704 @Override
705 public boolean isShutdown(ChannelShutdownDirection direction) {
706 if (!isActive()) {
707 return true;
708 }
709 switch (direction) {
710 case Inbound:
711 return inputShutdown;
712 case Outbound:
713 return outputShutdown;
714 default:
715 throw new AssertionError();
716 }
717 }
718 }