1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultAddressedEnvelope;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.socket.DatagramChannel;
29 import io.netty.channel.socket.DatagramChannelConfig;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.SocketProtocolFamily;
32 import io.netty.channel.unix.Errors;
33 import io.netty.channel.unix.Errors.NativeIoException;
34 import io.netty.channel.unix.SegmentedDatagramPacket;
35 import io.netty.channel.unix.Socket;
36 import io.netty.util.UncheckedBooleanSupplier;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.SystemPropertyUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.Inet4Address;
45 import java.net.InetAddress;
46 import java.net.InetSocketAddress;
47 import java.net.NetworkInterface;
48 import java.net.PortUnreachableException;
49 import java.net.SocketAddress;
50 import java.nio.channels.UnresolvedAddressException;
51
52 import static io.netty.channel.unix.Errors.ioResult;
53
54 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringDatagramChannel.class);
56 private static final boolean IP_MULTICAST_ALL =
57 SystemPropertyUtil.getBoolean("io.netty.channel.iouring.ipMulticastAll", false);
58 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
59 private static final String EXPECTED_TYPES =
60 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
61 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
62 StringUtil.simpleClassName(ByteBuf.class) + ", " +
63 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
64 StringUtil.simpleClassName(ByteBuf.class) + ')';
65
66 private final IoUringDatagramChannelConfig config;
67 private volatile boolean connected;
68
69 static {
70 if (logger.isDebugEnabled()) {
71 logger.debug("-Dio.netty.channel.iouring.ipMulticastAll: {}", IP_MULTICAST_ALL);
72 }
73 }
74
75
76
77
78
79
80
81 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
82 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
83 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
84
85
86
87
88
89 public IoUringDatagramChannel() {
90 this(null);
91 }
92
93
94
95
96
97 public IoUringDatagramChannel(SocketProtocolFamily family) {
98 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
99 }
100
101 private static boolean useIpv6(SocketProtocolFamily family) {
102 if (family == null) {
103 return Socket.isIPv6Preferred();
104 }
105 return family == SocketProtocolFamily.INET6;
106 }
107
108
109
110
111
112 public IoUringDatagramChannel(int fd) {
113 this(new LinuxSocket(fd), true);
114 }
115
116 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
117
118 super(null, fd, active);
119
120
121 try {
122 fd.setIpMulticastAll(IP_MULTICAST_ALL);
123 } catch (IOException | ChannelException e) {
124 logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
125 }
126
127 config = new IoUringDatagramChannelConfig(this);
128 }
129
130 @Override
131 protected boolean isStreamSocket() {
132 return false;
133 }
134
135 @Override
136 public InetSocketAddress remoteAddress() {
137 return (InetSocketAddress) super.remoteAddress();
138 }
139
140 @Override
141 public InetSocketAddress localAddress() {
142 return (InetSocketAddress) super.localAddress();
143 }
144
145 @Override
146 public ChannelMetadata metadata() {
147 return METADATA;
148 }
149
150 @Override
151 public boolean isActive() {
152 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
153 }
154
155 @Override
156 public boolean isConnected() {
157 return connected;
158 }
159
160 @Override
161 public ChannelFuture joinGroup(InetAddress multicastAddress) {
162 return joinGroup(multicastAddress, newPromise());
163 }
164
165 @Override
166 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
167 try {
168 return joinGroup(
169 multicastAddress,
170 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
171 } catch (IOException e) {
172 promise.setFailure(e);
173 }
174 return promise;
175 }
176
177 @Override
178 public ChannelFuture joinGroup(
179 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
180 return joinGroup(multicastAddress, networkInterface, newPromise());
181 }
182
183 @Override
184 public ChannelFuture joinGroup(
185 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
186 ChannelPromise promise) {
187 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
188 }
189
190 @Override
191 public ChannelFuture joinGroup(
192 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
193 return joinGroup(multicastAddress, networkInterface, source, newPromise());
194 }
195
196 @Override
197 public ChannelFuture joinGroup(
198 final InetAddress multicastAddress, final NetworkInterface networkInterface,
199 final InetAddress source, final ChannelPromise promise) {
200
201 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
202 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
203
204 try {
205 socket.joinGroup(multicastAddress, networkInterface, source);
206 promise.setSuccess();
207 } catch (IOException e) {
208 promise.setFailure(e);
209 }
210 return promise;
211 }
212
213 @Override
214 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
215 return leaveGroup(multicastAddress, newPromise());
216 }
217
218 @Override
219 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
220 try {
221 return leaveGroup(
222 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
223 } catch (IOException e) {
224 promise.setFailure(e);
225 }
226 return promise;
227 }
228
229 @Override
230 public ChannelFuture leaveGroup(
231 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
232 return leaveGroup(multicastAddress, networkInterface, newPromise());
233 }
234
235 @Override
236 public ChannelFuture leaveGroup(
237 InetSocketAddress multicastAddress,
238 NetworkInterface networkInterface, ChannelPromise promise) {
239 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
240 }
241
242 @Override
243 public ChannelFuture leaveGroup(
244 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
245 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
246 }
247
248 @Override
249 public ChannelFuture leaveGroup(
250 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
251 final ChannelPromise promise) {
252 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
253 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
254
255 try {
256 socket.leaveGroup(multicastAddress, networkInterface, source);
257 promise.setSuccess();
258 } catch (IOException e) {
259 promise.setFailure(e);
260 }
261 return promise;
262 }
263
264 @Override
265 public ChannelFuture block(
266 InetAddress multicastAddress, NetworkInterface networkInterface,
267 InetAddress sourceToBlock) {
268 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
269 }
270
271 @Override
272 public ChannelFuture block(
273 final InetAddress multicastAddress, final NetworkInterface networkInterface,
274 final InetAddress sourceToBlock, final ChannelPromise promise) {
275 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
276 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
277 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
278
279 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
280 return promise;
281 }
282
283 @Override
284 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
285 return block(multicastAddress, sourceToBlock, newPromise());
286 }
287
288 @Override
289 public ChannelFuture block(
290 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
291 try {
292 return block(
293 multicastAddress,
294 NetworkInterface.getByInetAddress(localAddress().getAddress()),
295 sourceToBlock, promise);
296 } catch (Throwable e) {
297 promise.setFailure(e);
298 }
299 return promise;
300 }
301
302 @Override
303 protected AbstractUnsafe newUnsafe() {
304 return new IoUringDatagramChannelUnsafe();
305 }
306
307 @Override
308 protected void doBind(SocketAddress localAddress) throws Exception {
309 if (localAddress instanceof InetSocketAddress) {
310 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
311 if (socketAddress.getAddress().isAnyLocalAddress() &&
312 socketAddress.getAddress() instanceof Inet4Address) {
313 if (socket.family() == SocketProtocolFamily.INET6) {
314 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
315 }
316 }
317 }
318 super.doBind(localAddress);
319 active = true;
320 }
321
322 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
323 if (envelope.recipient() instanceof InetSocketAddress
324 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
325 throw new UnresolvedAddressException();
326 }
327 }
328
329 @Override
330 protected Object filterOutboundMessage(Object msg) {
331 if (msg instanceof DatagramPacket) {
332 DatagramPacket packet = (DatagramPacket) msg;
333 checkUnresolved(packet);
334 ByteBuf content = packet.content();
335 return !content.hasMemoryAddress() ?
336 packet.replace(newDirectBuffer(packet, content)) : msg;
337 }
338
339 if (msg instanceof ByteBuf) {
340 ByteBuf buf = (ByteBuf) msg;
341 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
342 }
343
344 if (msg instanceof AddressedEnvelope) {
345 @SuppressWarnings("unchecked")
346 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
347 checkUnresolved(e);
348 if (e.content() instanceof ByteBuf &&
349 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
350
351 ByteBuf content = (ByteBuf) e.content();
352 return !content.hasMemoryAddress()?
353 new DefaultAddressedEnvelope<>(
354 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
355 }
356 }
357
358 throw new UnsupportedOperationException(
359 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
360 }
361
362 @Override
363 public DatagramChannelConfig config() {
364 return config;
365 }
366
367 @Override
368 protected void doDisconnect() throws Exception {
369
370 socket.disconnect();
371 connected = active = false;
372
373 resetCachedAddresses();
374 }
375
376 @Override
377 protected void doClose() throws Exception {
378 super.doClose();
379 connected = false;
380 }
381
382 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
383 private final WriteProcessor writeProcessor = new WriteProcessor();
384
385 private ByteBuf readBuffer;
386
387 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
388 private int written;
389 @Override
390 public boolean processMessage(Object msg) {
391 if (scheduleWrite(msg, written == 0)) {
392 written++;
393 return true;
394 }
395 return false;
396 }
397
398 int write(ChannelOutboundBuffer in) {
399 written = 0;
400 try {
401 in.forEachFlushedMessage(this);
402 } catch (Exception e) {
403
404 throw new IllegalStateException(e);
405 }
406 return written;
407 }
408 }
409
410 @Override
411 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
412 assert outstanding != -1 : "multi-shot not implemented yet";
413
414 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
415 final ChannelPipeline pipeline = pipeline();
416 ByteBuf byteBuf = this.readBuffer;
417 assert byteBuf != null;
418 MsgHdrMemory hdr = recvmsgHdrs.hdr(data);
419
420 recvmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
421
422 try {
423 if (res < 0) {
424 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
425
426
427 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
428 }
429 } else {
430 allocHandle.lastBytesRead(res);
431 if (hdr.hasPort(IoUringDatagramChannel.this)) {
432 allocHandle.incMessagesRead(1);
433 DatagramPacket packet = hdr.get(
434 IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
435 pipeline.fireChannelRead(packet);
436 }
437 }
438 } catch (Throwable t) {
439 Throwable e = (connected && t instanceof NativeIoException) ?
440 translateForConnected((NativeIoException) t) : t;
441 pipeline.fireExceptionCaught(e);
442 }
443
444 if (outstanding == 0) {
445
446
447 this.readBuffer.release();
448 this.readBuffer = null;
449 recvmsgHdrs.clear();
450
451 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
452 if (allocHandle.lastBytesRead() > 0 &&
453 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
454
455
456
457
458 (!IoUring.isCqeFSockNonEmptySupported() ||
459 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
460
461 scheduleRead(false);
462 } else {
463
464 allocHandle.readComplete();
465 pipeline.fireChannelReadComplete();
466 }
467 }
468 }
469 }
470
471 @Override
472 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
473 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
474 ByteBuf byteBuf = allocHandle.allocate(alloc());
475 assert readBuffer == null;
476 readBuffer = byteBuf;
477
478 int writable = byteBuf.writableBytes();
479 allocHandle.attemptedBytesRead(writable);
480 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
481
482 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
483
484 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
485 if (scheduled == 0) {
486
487
488 readBuffer = null;
489 byteBuf.release();
490 }
491 return scheduled;
492 }
493
494 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
495 int writable = byteBuf.writableBytes();
496 long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
497 if (numDatagram <= 1) {
498 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
499 }
500 int i = 0;
501
502 for (; i < numDatagram && writable >= datagramSize; i++) {
503 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
504 break;
505 }
506 bufferAddress += datagramSize;
507 writable -= datagramSize;
508 }
509 return i;
510 }
511
512 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
513 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
514 if (msgHdrMemory == null) {
515
516 return false;
517 }
518 msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
519
520 int fd = fd().intValue();
521 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
522 IoRegistration registration = registration();
523
524
525 IoUringIoOps ops = IoUringIoOps.newRecvmsg(
526 fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
527 long id = registration.submit(ops);
528 if (id == 0) {
529
530 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
531 return false;
532 }
533 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
534 return true;
535 }
536
537 @Override
538 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
539 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
540
541
542 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
543 sendmsgResArray[data] = res;
544
545 if (outstanding == 0) {
546
547 boolean writtenSomething = false;
548 int numWritten = sendmsgHdrs.length();
549 sendmsgHdrs.clear();
550 for (int i = 0; i < numWritten; i++) {
551 writtenSomething |= removeFromOutboundBuffer(
552 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
553 }
554 return writtenSomething;
555 }
556 return true;
557 }
558
559 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
560 if (res >= 0) {
561
562 return outboundBuffer.remove();
563 }
564 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
565 return false;
566 }
567 try {
568 return ioResult(errormsg, res) != 0;
569 } catch (Throwable cause) {
570 Throwable e = (connected && cause instanceof NativeIoException) ?
571 translateForConnected((NativeIoException) cause) : cause;
572 return outboundBuffer.remove(e);
573 }
574 }
575
576 @Override
577 void connectComplete(byte op, int res, int flags, short data) {
578 if (res >= 0) {
579 connected = true;
580 }
581 super.connectComplete(op, res, flags, data);
582 }
583
584 @Override
585 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
586 return writeProcessor.write(in);
587 }
588
589 @Override
590 protected int scheduleWriteSingle(Object msg) {
591 return scheduleWrite(msg, true) ? 1 : 0;
592 }
593
594 private boolean scheduleWrite(Object msg, boolean first) {
595 final ByteBuf data;
596 final InetSocketAddress remoteAddress;
597 final int segmentSize;
598 if (msg instanceof AddressedEnvelope) {
599 @SuppressWarnings("unchecked")
600 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
601 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
602 data = envelope.content();
603 remoteAddress = envelope.recipient();
604 if (msg instanceof SegmentedDatagramPacket) {
605 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
606 } else {
607 segmentSize = 0;
608 }
609 } else {
610 data = (ByteBuf) msg;
611 remoteAddress = (InetSocketAddress) remoteAddress();
612 segmentSize = 0;
613 }
614
615 long bufferAddress = IoUring.memoryAddress(data);
616 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
617 }
618
619 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
620 int bufferLength, int segmentSize, boolean first) {
621 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
622 if (hdr == null) {
623
624
625 return false;
626 }
627 hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
628
629 int fd = fd().intValue();
630 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
631 IoRegistration registration = registration();
632 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
633 long id = registration.submit(ops);
634 if (id == 0) {
635
636 sendmsgHdrs.restoreNextHdr(hdr);
637 return false;
638 }
639 sendmsgHdrs.setId(hdr.idx(), id);
640 return true;
641 }
642
643 @Override
644 public void unregistered() {
645 super.unregistered();
646 sendmsgHdrs.release();
647 recvmsgHdrs.release();
648 assert readBuffer == null;
649 }
650 }
651
652 private static IOException translateForConnected(NativeIoException e) {
653
654 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
655 PortUnreachableException error = new PortUnreachableException(e.getMessage());
656 error.initCause(e);
657 return error;
658 }
659 return e;
660 }
661
662
663
664
665
666
667 public static boolean isSegmentedDatagramPacketSupported() {
668 return IoUring.isAvailable();
669 }
670
671 @Override
672 protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
673 if (numOutstandingReads > 0) {
674 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
675 assert canceled == numOutstandingReads;
676 }
677 }
678
679 @Override
680 protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
681 if (numOutstandingWrites > 0) {
682 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
683 assert canceled == numOutstandingWrites;
684 }
685 }
686
687 private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
688 int cancelled = 0;
689 for (int idx = 0; idx < array.length(); idx++) {
690 long id = array.id(idx);
691 if (id == MsgHdrMemoryArray.NO_ID) {
692 continue;
693 }
694
695
696 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
697 registration.submit(ops);
698 cancelled++;
699 }
700 return cancelled;
701 }
702
703 @Override
704 protected boolean socketIsEmpty(int flags) {
705 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
706 }
707
708 @Override
709 boolean isPollInFirst() {
710 return false;
711 }
712 }