1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.socket.nio;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.ChannelShutdownDirection;
20 import io.netty5.channel.FixedRecvBufferAllocator;
21 import io.netty5.util.Resource;
22 import io.netty5.buffer.api.WritableComponent;
23 import io.netty5.buffer.api.WritableComponentProcessor;
24 import io.netty5.channel.AddressedEnvelope;
25 import io.netty5.channel.Channel;
26 import io.netty5.channel.ChannelException;
27 import io.netty5.channel.ChannelMetadata;
28 import io.netty5.channel.ChannelOption;
29 import io.netty5.channel.ChannelOutboundBuffer;
30 import io.netty5.channel.DefaultBufferAddressedEnvelope;
31 import io.netty5.channel.EventLoop;
32 import io.netty5.channel.RecvBufferAllocator;
33 import io.netty5.channel.RecvBufferAllocator.Handle;
34 import io.netty5.channel.nio.AbstractNioMessageChannel;
35 import io.netty5.channel.socket.DatagramPacket;
36 import io.netty5.util.concurrent.Future;
37 import io.netty5.util.internal.PlatformDependent;
38 import io.netty5.util.internal.SocketUtils;
39 import io.netty5.util.internal.StringUtil;
40 import io.netty5.util.internal.logging.InternalLogger;
41 import io.netty5.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.InetAddress;
45 import java.net.InetSocketAddress;
46 import java.net.NetworkInterface;
47 import java.net.ProtocolFamily;
48 import java.net.SocketAddress;
49 import java.net.SocketException;
50 import java.net.SocketOption;
51 import java.net.StandardSocketOptions;
52 import java.nio.ByteBuffer;
53 import java.nio.channels.DatagramChannel;
54 import java.nio.channels.MembershipKey;
55 import java.nio.channels.SelectionKey;
56 import java.nio.channels.spi.SelectorProvider;
57 import java.util.ArrayList;
58 import java.util.HashMap;
59 import java.util.Iterator;
60 import java.util.List;
61 import java.util.Map;
62 import java.util.function.Predicate;
63
64 import static io.netty5.channel.ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION;
65
66 import static io.netty5.channel.socket.nio.NioChannelUtil.isDomainSocket;
67 import static io.netty5.channel.socket.nio.NioChannelUtil.toDomainSocketAddress;
68 import static io.netty5.channel.socket.nio.NioChannelUtil.toJdkFamily;
69 import static io.netty5.channel.socket.nio.NioChannelUtil.toUnixDomainSocketAddress;
70 import static java.util.Objects.requireNonNull;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public final class NioDatagramChannel
97 extends AbstractNioMessageChannel<Channel, SocketAddress, SocketAddress>
98 implements io.netty5.channel.socket.DatagramChannel {
99 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioDatagramChannel.class);
100
101 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
102 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
103 private static final String EXPECTED_TYPES =
104 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
105 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
106 StringUtil.simpleClassName(Buffer.class) + ", " +
107 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
108 StringUtil.simpleClassName(Buffer.class) + ')';
109
110 private static final Predicate<Handle> TRUE_SUPPLIER = h -> true;
111
112 private final ProtocolFamily family;
113
114 private volatile boolean inputShutdown;
115 private volatile boolean outputShutdown;
116
117 private Map<InetAddress, List<MembershipKey>> memberships;
118
119 private volatile boolean activeOnOpen;
120 private volatile boolean bound;
121
122 private static DatagramChannel newSocket(SelectorProvider provider) {
123 try {
124
125
126
127 return provider.openDatagramChannel();
128 } catch (IOException e) {
129 throw new ChannelException("Failed to open a socket.", e);
130 }
131 }
132
133 private static DatagramChannel newSocket(SelectorProvider provider, ProtocolFamily family) {
134 if (family == null) {
135 return newSocket(provider);
136 }
137 try {
138 return provider.openDatagramChannel(family);
139 } catch (IOException e) {
140 throw new ChannelException("Failed to open a socket.", e);
141 }
142 }
143
144
145
146
147 public NioDatagramChannel(EventLoop eventLoop) {
148 this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER), null);
149 }
150
151
152
153
154
155 public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider) {
156 this(eventLoop, newSocket(provider), null);
157 }
158
159
160
161
162
163 public NioDatagramChannel(EventLoop eventLoop, ProtocolFamily family) {
164 this(eventLoop, DEFAULT_SELECTOR_PROVIDER, family);
165 }
166
167
168
169
170
171
172 public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider, ProtocolFamily family) {
173 this(eventLoop, newSocket(provider, toJdkFamily(family)), family);
174 }
175
176
177
178
179 public NioDatagramChannel(EventLoop eventLoop, DatagramChannel socket, ProtocolFamily family) {
180 super(null, eventLoop, METADATA, new FixedRecvBufferAllocator(2048), socket, SelectionKey.OP_READ);
181 this.family = toJdkFamily(family);
182 }
183
184 @SuppressWarnings("unchecked")
185 @Override
186 protected <T> T getExtendedOption(ChannelOption<T> option) {
187 if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
188 return (T) Boolean.valueOf(isActiveOnOpen());
189 }
190 SocketOption<T> socketOption = NioChannelOption.toSocketOption(option);
191 if (socketOption != null) {
192 return NioChannelOption.getOption(javaChannel(), socketOption);
193 }
194 return super.getExtendedOption(option);
195 }
196
197 @Override
198 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
199 if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
200 setActiveOnOpen((Boolean) value);
201 } else {
202 SocketOption<T> socketOption = NioChannelOption.toSocketOption(option);
203 if (socketOption != null) {
204 try {
205
206 if (socketOption == StandardSocketOptions.SO_BROADCAST &&
207 !isAnyLocalAddress() &&
208 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
209
210
211 logger.warn(
212 "A non-root user can't receive a broadcast packet if the socket " +
213 "is not bound to a wildcard address; setting the SO_BROADCAST flag " +
214 "anyway as requested on the socket which is bound to " +
215 javaChannel().getLocalAddress() + '.');
216 }
217 NioChannelOption.setOption(javaChannel(), socketOption, value);
218 } catch (IOException e) {
219 throw new ChannelException(e);
220 }
221 } else {
222 super.setExtendedOption(option, value);
223 }
224 }
225 }
226
227 @Override
228 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
229 if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
230 return true;
231 }
232 SocketOption<?> socketOption = NioChannelOption.toSocketOption(option);
233 if (socketOption != null) {
234 return NioChannelOption.isOptionSupported(javaChannel(), socketOption);
235 }
236 return super.isExtendedOptionSupported(option);
237 }
238
239 private boolean isActiveOnOpen() {
240 return activeOnOpen;
241 }
242
243 private void setActiveOnOpen(boolean activeOnOpen) {
244 if (isRegistered()) {
245 throw new IllegalStateException("Can only changed before channel was registered");
246 }
247 this.activeOnOpen = activeOnOpen;
248 }
249
250 private boolean isAnyLocalAddress() throws IOException {
251 SocketAddress address = javaChannel().getLocalAddress();
252 return address instanceof InetSocketAddress && ((InetSocketAddress) address).getAddress().isAnyLocalAddress();
253 }
254
255 private NetworkInterface getNetworkInterface() {
256 try {
257 return javaChannel().getOption(StandardSocketOptions.IP_MULTICAST_IF);
258 } catch (IOException e) {
259 throw new ChannelException(e);
260 }
261 }
262
263 @Override
264 protected void doShutdown(ChannelShutdownDirection direction) {
265 switch (direction) {
266 case Inbound:
267 inputShutdown = true;
268 break;
269 case Outbound:
270 outputShutdown = true;
271 break;
272 default:
273 throw new AssertionError();
274 }
275 }
276
277 @Override
278 public boolean isShutdown(ChannelShutdownDirection direction) {
279 if (!isActive()) {
280 return true;
281 }
282 switch (direction) {
283 case Inbound:
284 return inputShutdown;
285 case Outbound:
286 return outputShutdown;
287 default:
288 throw new AssertionError();
289 }
290 }
291
292 @Override
293 @SuppressWarnings("deprecation")
294 public boolean isActive() {
295 DatagramChannel ch = javaChannel();
296 return ch.isOpen() && (getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
297 || bound);
298 }
299
300 @Override
301 public boolean isConnected() {
302 return javaChannel().isConnected();
303 }
304
305 @Override
306 protected DatagramChannel javaChannel() {
307 return (DatagramChannel) super.javaChannel();
308 }
309
310 @Override
311 protected SocketAddress localAddress0() {
312 try {
313 SocketAddress address = javaChannel().getLocalAddress();
314 if (isDomainSocket(family)) {
315 return toDomainSocketAddress(address);
316 }
317 return address;
318 } catch (IOException e) {
319
320 return null;
321 }
322 }
323
324 @Override
325 protected SocketAddress remoteAddress0() {
326 try {
327 SocketAddress address = javaChannel().getRemoteAddress();
328 if (isDomainSocket(family)) {
329 return toDomainSocketAddress(address);
330 }
331 return address;
332 } catch (IOException e) {
333
334 return null;
335 }
336 }
337
338 @Override
339 protected void doBind(SocketAddress localAddress) throws Exception {
340 doBind0(localAddress);
341 }
342
343 private void doBind0(SocketAddress localAddress) throws Exception {
344 if (isDomainSocket(family)) {
345 localAddress = toUnixDomainSocketAddress(localAddress);
346 }
347 SocketUtils.bind(javaChannel(), localAddress);
348 bound = true;
349 }
350
351 @Override
352 protected boolean doConnect(SocketAddress remoteAddress,
353 SocketAddress localAddress) throws Exception {
354 if (localAddress != null) {
355 doBind0(localAddress);
356 }
357
358 boolean success = false;
359 try {
360 javaChannel().connect(remoteAddress);
361
362 bound = true;
363 success = true;
364 return true;
365 } finally {
366 if (!success) {
367 doClose();
368 }
369 }
370 }
371
372 @Override
373 protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) {
374 return true;
375 }
376
377 @Override
378 protected void doDisconnect() throws Exception {
379 javaChannel().disconnect();
380 }
381
382 @Override
383 protected int doReadMessages(List<Object> buf) throws Exception {
384 RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
385
386 return doReadBufferMessages(allocHandle, buf);
387 }
388
389 private int doReadBufferMessages(Handle allocHandle, List<Object> buf) throws IOException {
390 Buffer data = allocHandle.allocate(bufferAllocator());
391 allocHandle.attemptedBytesRead(data.writableBytes());
392 boolean free = true;
393 try {
394 ReceiveDatagram receiveDatagram = new ReceiveDatagram(javaChannel());
395 data.forEachWritable(0, receiveDatagram);
396 SocketAddress remoteAddress = receiveDatagram.remoteAddress;
397 if (remoteAddress == null) {
398 return 0;
399 }
400
401 allocHandle.lastBytesRead(receiveDatagram.bytesReceived);
402 data.skipWritableBytes(allocHandle.lastBytesRead());
403 buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
404 free = false;
405 return 1;
406 } finally {
407 if (free) {
408 data.close();
409 }
410 }
411 }
412
413 @Override
414 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
415 final SocketAddress remoteAddress;
416 final Object data;
417 if (msg instanceof AddressedEnvelope) {
418 @SuppressWarnings("unchecked")
419 AddressedEnvelope<?, SocketAddress> envelope = (AddressedEnvelope<?, SocketAddress>) msg;
420 remoteAddress = envelope.recipient();
421 data = envelope.content();
422 } else {
423 data = msg;
424 remoteAddress = null;
425 }
426
427 Buffer buf = (Buffer) data;
428 final int length = buf.readableBytes();
429 if (length == 0) {
430 return true;
431 }
432
433 int initialReadable = buf.readableBytes();
434 buf.forEachReadable(0, (index, component) -> {
435 final int writtenBytes;
436 if (remoteAddress != null) {
437 writtenBytes = javaChannel().send(component.readableBuffer(), remoteAddress);
438 } else {
439 writtenBytes = javaChannel().write(component.readableBuffer());
440 }
441 component.skipReadableBytes(writtenBytes);
442 return true;
443 });
444 return buf.readableBytes() < initialReadable;
445 }
446
447 @Override
448 protected Object filterOutboundMessage(Object msg) {
449 if (msg instanceof DatagramPacket) {
450 DatagramPacket p = (DatagramPacket) msg;
451 Buffer content = p.content();
452 if (isSingleDirectBuffer(content)) {
453 return p;
454 }
455 return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
456 }
457
458 if (msg instanceof Buffer) {
459 Buffer buf = (Buffer) msg;
460 if (isSingleDirectBuffer(buf)) {
461 return buf;
462 }
463 return newDirectBuffer(buf);
464 }
465
466 if (msg instanceof AddressedEnvelope) {
467 @SuppressWarnings("unchecked")
468 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
469 Object content = e.content();
470 if (content instanceof Buffer) {
471 Buffer buf = (Buffer) content;
472 if (isSingleDirectBuffer(buf)) {
473 return e;
474 }
475 return new DefaultBufferAddressedEnvelope<>(newDirectBuffer((Resource<?>) e, buf), e.recipient());
476 }
477 }
478
479 throw new UnsupportedOperationException(
480 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
481 }
482
483
484
485
486
487 private static boolean isSingleDirectBuffer(Buffer buf) {
488 return buf.isDirect() && buf.countComponents() == 1;
489 }
490
491 @Override
492 protected boolean continueOnWriteError() {
493
494
495
496 return true;
497 }
498
499 private NetworkInterface networkInterface() throws SocketException {
500 NetworkInterface iface = getNetworkInterface();
501 if (iface == null) {
502 SocketAddress localAddress = localAddress();
503 if (localAddress instanceof InetSocketAddress) {
504 return NetworkInterface.getByInetAddress(((InetSocketAddress) localAddress()).getAddress());
505 }
506 throw new UnsupportedOperationException();
507 }
508 return iface;
509 }
510
511 @Override
512 public Future<Void> joinGroup(InetAddress multicastAddress) {
513 try {
514 return joinGroup(
515 multicastAddress, networkInterface(), null);
516 } catch (SocketException | UnsupportedOperationException e) {
517 return newFailedFuture(e);
518 }
519 }
520
521 @Override
522 public Future<Void> joinGroup(
523 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
524 requireNonNull(multicastAddress, "multicastAddress");
525 requireNonNull(networkInterface, "networkInterface");
526
527 try {
528 MembershipKey key;
529 if (source == null) {
530 key = javaChannel().join(multicastAddress, networkInterface);
531 } else {
532 key = javaChannel().join(multicastAddress, networkInterface, source);
533 }
534
535 synchronized (this) {
536 List<MembershipKey> keys = null;
537 if (memberships == null) {
538 memberships = new HashMap<>();
539 } else {
540 keys = memberships.get(multicastAddress);
541 }
542 if (keys == null) {
543 keys = new ArrayList<>();
544 memberships.put(multicastAddress, keys);
545 }
546 keys.add(key);
547 }
548
549 return newSucceededFuture();
550 } catch (Throwable e) {
551 return newFailedFuture(e);
552 }
553 }
554
555 @Override
556 public Future<Void> leaveGroup(InetAddress multicastAddress) {
557 try {
558 return leaveGroup(
559 multicastAddress, networkInterface(), null);
560 } catch (SocketException | UnsupportedOperationException e) {
561 return newFailedFuture(e);
562 }
563 }
564
565 @Override
566 public Future<Void> leaveGroup(
567 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
568 requireNonNull(multicastAddress, "multicastAddress");
569 requireNonNull(networkInterface, "networkInterface");
570
571 synchronized (this) {
572 if (memberships != null) {
573 List<MembershipKey> keys = memberships.get(multicastAddress);
574 if (keys != null) {
575 Iterator<MembershipKey> keyIt = keys.iterator();
576
577 while (keyIt.hasNext()) {
578 MembershipKey key = keyIt.next();
579 if (networkInterface.equals(key.networkInterface())) {
580 if (source == null && key.sourceAddress() == null ||
581 source != null && source.equals(key.sourceAddress())) {
582 key.drop();
583 keyIt.remove();
584 }
585 }
586 }
587 if (keys.isEmpty()) {
588 memberships.remove(multicastAddress);
589 }
590 }
591 }
592 }
593 return newSucceededFuture();
594 }
595
596
597
598
599 @Override
600 public Future<Void> block(
601 InetAddress multicastAddress, NetworkInterface networkInterface,
602 InetAddress sourceToBlock) {
603 requireNonNull(multicastAddress, "multicastAddress");
604 requireNonNull(sourceToBlock, "sourceToBlock");
605 requireNonNull(networkInterface, "networkInterface");
606
607 synchronized (this) {
608 if (memberships != null) {
609 List<MembershipKey> keys = memberships.get(multicastAddress);
610 for (MembershipKey key: keys) {
611 if (networkInterface.equals(key.networkInterface())) {
612 try {
613 key.block(sourceToBlock);
614 } catch (IOException e) {
615 return newFailedFuture(e);
616 }
617 }
618 }
619 }
620 }
621 return newSucceededFuture();
622 }
623
624
625
626
627 @Override
628 public Future<Void> block(InetAddress multicastAddress, InetAddress sourceToBlock) {
629 try {
630 return block(
631 multicastAddress, networkInterface(),
632 sourceToBlock);
633 } catch (SocketException | UnsupportedOperationException e) {
634 return newFailedFuture(e);
635 }
636 }
637
638 void clearReadPending0() {
639 clearReadPending();
640 }
641
642 @Override
643 protected boolean closeOnReadError(Throwable cause) {
644
645
646 if (cause instanceof SocketException) {
647 return false;
648 }
649 return super.closeOnReadError(cause);
650 }
651
652 @Override
653 protected boolean continueReading(RecvBufferAllocator.Handle allocHandle) {
654
655
656 return allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER);
657 }
658
659 private static final class ReceiveDatagram implements WritableComponentProcessor<IOException> {
660 private final DatagramChannel channel;
661 private SocketAddress remoteAddress;
662 private int bytesReceived;
663
664 ReceiveDatagram(DatagramChannel channel) {
665 this.channel = channel;
666 }
667
668 @Override
669 public boolean process(int index, WritableComponent component) throws IOException {
670 ByteBuffer dst = component.writableBuffer();
671 int position = dst.position();
672 remoteAddress = channel.receive(dst);
673 bytesReceived = dst.position() - position;
674 return false;
675 }
676 }
677 }