1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultAddressedEnvelope;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.socket.DatagramChannel;
29 import io.netty.channel.socket.DatagramChannelConfig;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.SocketProtocolFamily;
32 import io.netty.channel.unix.Errors;
33 import io.netty.channel.unix.Errors.NativeIoException;
34 import io.netty.channel.unix.SegmentedDatagramPacket;
35 import io.netty.channel.unix.Socket;
36 import io.netty.util.UncheckedBooleanSupplier;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.SystemPropertyUtil;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.net.Inet4Address;
45 import java.net.InetAddress;
46 import java.net.InetSocketAddress;
47 import java.net.NetworkInterface;
48 import java.net.PortUnreachableException;
49 import java.net.SocketAddress;
50 import java.nio.channels.UnresolvedAddressException;
51
52 import static io.netty.channel.unix.Errors.ioResult;
53
54 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringDatagramChannel.class);
56 private static final boolean IP_MULTICAST_ALL =
57 SystemPropertyUtil.getBoolean("io.netty.channel.iouring.ipMulticastAll", false);
58 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
59 private static final String EXPECTED_TYPES =
60 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
61 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
62 StringUtil.simpleClassName(ByteBuf.class) + ", " +
63 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
64 StringUtil.simpleClassName(ByteBuf.class) + ')';
65
66 private final IoUringDatagramChannelConfig config;
67 private volatile boolean connected;
68
69 static {
70 if (logger.isDebugEnabled()) {
71 logger.debug("-Dio.netty.channel.iouring.ipMulticastAll: {}", IP_MULTICAST_ALL);
72 }
73 }
74
75
76
77
78
79
80
81 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
82 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
83 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
84
85
86
87
88
89 public IoUringDatagramChannel() {
90 this(null);
91 }
92
93
94
95
96
97 public IoUringDatagramChannel(SocketProtocolFamily family) {
98 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
99 }
100
101 private static boolean useIpv6(SocketProtocolFamily family) {
102 if (family == null) {
103 return Socket.isIPv6Preferred();
104 }
105 return family == SocketProtocolFamily.INET6;
106 }
107
108
109
110
111
112 public IoUringDatagramChannel(int fd) {
113 this(new LinuxSocket(fd), true);
114 }
115
116 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
117
118 super(null, fd, active);
119
120
121 try {
122 fd.setIpMulticastAll(IP_MULTICAST_ALL);
123 } catch (IOException | ChannelException e) {
124 logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
125 }
126
127 config = new IoUringDatagramChannelConfig(this);
128 }
129
130 @Override
131 public InetSocketAddress remoteAddress() {
132 return (InetSocketAddress) super.remoteAddress();
133 }
134
135 @Override
136 public InetSocketAddress localAddress() {
137 return (InetSocketAddress) super.localAddress();
138 }
139
140 @Override
141 public ChannelMetadata metadata() {
142 return METADATA;
143 }
144
145 @Override
146 public boolean isActive() {
147 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
148 }
149
150 @Override
151 public boolean isConnected() {
152 return connected;
153 }
154
155 @Override
156 public ChannelFuture joinGroup(InetAddress multicastAddress) {
157 return joinGroup(multicastAddress, newPromise());
158 }
159
160 @Override
161 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
162 try {
163 return joinGroup(
164 multicastAddress,
165 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
166 } catch (IOException e) {
167 promise.setFailure(e);
168 }
169 return promise;
170 }
171
172 @Override
173 public ChannelFuture joinGroup(
174 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
175 return joinGroup(multicastAddress, networkInterface, newPromise());
176 }
177
178 @Override
179 public ChannelFuture joinGroup(
180 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
181 ChannelPromise promise) {
182 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
183 }
184
185 @Override
186 public ChannelFuture joinGroup(
187 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
188 return joinGroup(multicastAddress, networkInterface, source, newPromise());
189 }
190
191 @Override
192 public ChannelFuture joinGroup(
193 final InetAddress multicastAddress, final NetworkInterface networkInterface,
194 final InetAddress source, final ChannelPromise promise) {
195
196 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
197 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
198
199 try {
200 socket.joinGroup(multicastAddress, networkInterface, source);
201 promise.setSuccess();
202 } catch (IOException e) {
203 promise.setFailure(e);
204 }
205 return promise;
206 }
207
208 @Override
209 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
210 return leaveGroup(multicastAddress, newPromise());
211 }
212
213 @Override
214 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
215 try {
216 return leaveGroup(
217 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
218 } catch (IOException e) {
219 promise.setFailure(e);
220 }
221 return promise;
222 }
223
224 @Override
225 public ChannelFuture leaveGroup(
226 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
227 return leaveGroup(multicastAddress, networkInterface, newPromise());
228 }
229
230 @Override
231 public ChannelFuture leaveGroup(
232 InetSocketAddress multicastAddress,
233 NetworkInterface networkInterface, ChannelPromise promise) {
234 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
235 }
236
237 @Override
238 public ChannelFuture leaveGroup(
239 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
240 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
241 }
242
243 @Override
244 public ChannelFuture leaveGroup(
245 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
246 final ChannelPromise promise) {
247 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
248 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
249
250 try {
251 socket.leaveGroup(multicastAddress, networkInterface, source);
252 promise.setSuccess();
253 } catch (IOException e) {
254 promise.setFailure(e);
255 }
256 return promise;
257 }
258
259 @Override
260 public ChannelFuture block(
261 InetAddress multicastAddress, NetworkInterface networkInterface,
262 InetAddress sourceToBlock) {
263 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
264 }
265
266 @Override
267 public ChannelFuture block(
268 final InetAddress multicastAddress, final NetworkInterface networkInterface,
269 final InetAddress sourceToBlock, final ChannelPromise promise) {
270 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
271 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
272 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
273
274 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
275 return promise;
276 }
277
278 @Override
279 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
280 return block(multicastAddress, sourceToBlock, newPromise());
281 }
282
283 @Override
284 public ChannelFuture block(
285 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
286 try {
287 return block(
288 multicastAddress,
289 NetworkInterface.getByInetAddress(localAddress().getAddress()),
290 sourceToBlock, promise);
291 } catch (Throwable e) {
292 promise.setFailure(e);
293 }
294 return promise;
295 }
296
297 @Override
298 protected AbstractUnsafe newUnsafe() {
299 return new IoUringDatagramChannelUnsafe();
300 }
301
302 @Override
303 protected void doBind(SocketAddress localAddress) throws Exception {
304 if (localAddress instanceof InetSocketAddress) {
305 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
306 if (socketAddress.getAddress().isAnyLocalAddress() &&
307 socketAddress.getAddress() instanceof Inet4Address) {
308 if (socket.family() == SocketProtocolFamily.INET6) {
309 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
310 }
311 }
312 }
313 super.doBind(localAddress);
314 active = true;
315 }
316
317 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
318 if (envelope.recipient() instanceof InetSocketAddress
319 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
320 throw new UnresolvedAddressException();
321 }
322 }
323
324 @Override
325 protected Object filterOutboundMessage(Object msg) {
326 if (msg instanceof DatagramPacket) {
327 DatagramPacket packet = (DatagramPacket) msg;
328 checkUnresolved(packet);
329 ByteBuf content = packet.content();
330 return !content.hasMemoryAddress() ?
331 packet.replace(newDirectBuffer(packet, content)) : msg;
332 }
333
334 if (msg instanceof ByteBuf) {
335 ByteBuf buf = (ByteBuf) msg;
336 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
337 }
338
339 if (msg instanceof AddressedEnvelope) {
340 @SuppressWarnings("unchecked")
341 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
342 checkUnresolved(e);
343 if (e.content() instanceof ByteBuf &&
344 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
345
346 ByteBuf content = (ByteBuf) e.content();
347 return !content.hasMemoryAddress()?
348 new DefaultAddressedEnvelope<>(
349 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
350 }
351 }
352
353 throw new UnsupportedOperationException(
354 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
355 }
356
357 @Override
358 public DatagramChannelConfig config() {
359 return config;
360 }
361
362 @Override
363 protected void doDisconnect() throws Exception {
364
365 socket.disconnect();
366 connected = active = false;
367
368 resetCachedAddresses();
369 }
370
371 @Override
372 protected void doClose() throws Exception {
373 super.doClose();
374 connected = false;
375 }
376
377 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
378 private final WriteProcessor writeProcessor = new WriteProcessor();
379
380 private ByteBuf readBuffer;
381
382 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
383 private int written;
384 @Override
385 public boolean processMessage(Object msg) {
386 if (scheduleWrite(msg, written == 0)) {
387 written++;
388 return true;
389 }
390 return false;
391 }
392
393 int write(ChannelOutboundBuffer in) {
394 written = 0;
395 try {
396 in.forEachFlushedMessage(this);
397 } catch (Exception e) {
398
399 throw new IllegalStateException(e);
400 }
401 return written;
402 }
403 }
404
405 @Override
406 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
407 assert outstanding != -1 : "multi-shot not implemented yet";
408
409 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
410 final ChannelPipeline pipeline = pipeline();
411 ByteBuf byteBuf = this.readBuffer;
412 assert byteBuf != null;
413 try {
414 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
415 } catch (Throwable t) {
416 Throwable e = (connected && t instanceof NativeIoException) ?
417 translateForConnected((NativeIoException) t) : t;
418 pipeline.fireExceptionCaught(e);
419 }
420 }
421
422 private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
423 ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
424 throws IOException {
425 MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
426 if (res < 0) {
427 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
428
429
430 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
431 }
432 } else {
433 allocHandle.lastBytesRead(res);
434 if (hdr.hasPort(IoUringDatagramChannel.this)) {
435 allocHandle.incMessagesRead(1);
436 DatagramPacket packet = hdr.get(
437 IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
438 pipeline.fireChannelRead(packet);
439 }
440 }
441
442
443 recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
444 if (outstanding == 0) {
445
446
447 this.readBuffer.release();
448 this.readBuffer = null;
449 recvmsgHdrs.clear();
450
451 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
452 if (allocHandle.lastBytesRead() > 0 &&
453 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
454
455
456
457
458 (!IoUring.isCqeFSockNonEmptySupported() ||
459 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
460
461 scheduleRead(false);
462 } else {
463
464 allocHandle.readComplete();
465 pipeline.fireChannelReadComplete();
466 }
467 }
468 }
469 }
470
471 @Override
472 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
473 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
474 ByteBuf byteBuf = allocHandle.allocate(alloc());
475 assert readBuffer == null;
476 readBuffer = byteBuf;
477
478 int writable = byteBuf.writableBytes();
479 allocHandle.attemptedBytesRead(writable);
480 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
481
482 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
483
484 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
485 if (scheduled == 0) {
486
487
488 readBuffer = null;
489 byteBuf.release();
490 }
491 return scheduled;
492 }
493
494 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
495 int writable = byteBuf.writableBytes();
496 long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
497 if (numDatagram <= 1) {
498 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
499 }
500 int i = 0;
501
502 for (; i < numDatagram && writable >= datagramSize; i++) {
503 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
504 break;
505 }
506 bufferAddress += datagramSize;
507 writable -= datagramSize;
508 }
509 return i;
510 }
511
512 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
513 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
514 if (msgHdrMemory == null) {
515
516 return false;
517 }
518 msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
519
520 int fd = fd().intValue();
521 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
522 IoRegistration registration = registration();
523
524
525 IoUringIoOps ops = IoUringIoOps.newRecvmsg(
526 fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
527 long id = registration.submit(ops);
528 if (id == 0) {
529
530 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
531 return false;
532 }
533 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
534 return true;
535 }
536
537 @Override
538 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
539 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
540
541
542 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
543 sendmsgResArray[data] = res;
544
545 if (outstanding == 0) {
546
547 boolean writtenSomething = false;
548 int numWritten = sendmsgHdrs.length();
549 sendmsgHdrs.clear();
550 for (int i = 0; i < numWritten; i++) {
551 writtenSomething |= removeFromOutboundBuffer(
552 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
553 }
554 return writtenSomething;
555 }
556 return true;
557 }
558
559 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
560 if (res >= 0) {
561
562 return outboundBuffer.remove();
563 }
564 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
565 return false;
566 }
567 try {
568 return ioResult(errormsg, res) != 0;
569 } catch (Throwable cause) {
570 Throwable e = (connected && cause instanceof NativeIoException) ?
571 translateForConnected((NativeIoException) cause) : cause;
572 return outboundBuffer.remove(e);
573 }
574 }
575
576 @Override
577 void connectComplete(byte op, int res, int flags, short data) {
578 if (res >= 0) {
579 connected = true;
580 }
581 super.connectComplete(op, res, flags, data);
582 }
583
584 @Override
585 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
586 return writeProcessor.write(in);
587 }
588
589 @Override
590 protected int scheduleWriteSingle(Object msg) {
591 return scheduleWrite(msg, true) ? 1 : 0;
592 }
593
594 private boolean scheduleWrite(Object msg, boolean first) {
595 final ByteBuf data;
596 final InetSocketAddress remoteAddress;
597 final int segmentSize;
598 if (msg instanceof AddressedEnvelope) {
599 @SuppressWarnings("unchecked")
600 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
601 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
602 data = envelope.content();
603 remoteAddress = envelope.recipient();
604 if (msg instanceof SegmentedDatagramPacket) {
605 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
606 } else {
607 segmentSize = 0;
608 }
609 } else {
610 data = (ByteBuf) msg;
611 remoteAddress = (InetSocketAddress) remoteAddress();
612 segmentSize = 0;
613 }
614
615 long bufferAddress = IoUring.memoryAddress(data);
616 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
617 }
618
619 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
620 int bufferLength, int segmentSize, boolean first) {
621 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
622 if (hdr == null) {
623
624
625 return false;
626 }
627 hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
628
629 int fd = fd().intValue();
630 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
631 IoRegistration registration = registration();
632 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
633 long id = registration.submit(ops);
634 if (id == 0) {
635
636 sendmsgHdrs.restoreNextHdr(hdr);
637 return false;
638 }
639 sendmsgHdrs.setId(hdr.idx(), id);
640 return true;
641 }
642
643 @Override
644 public void unregistered() {
645 super.unregistered();
646 sendmsgHdrs.release();
647 recvmsgHdrs.release();
648 }
649 }
650
651 private static IOException translateForConnected(NativeIoException e) {
652
653 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
654 PortUnreachableException error = new PortUnreachableException(e.getMessage());
655 error.initCause(e);
656 return error;
657 }
658 return e;
659 }
660
661
662
663
664
665
666 public static boolean isSegmentedDatagramPacketSupported() {
667 return IoUring.isAvailable();
668 }
669
670 @Override
671 protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
672 if (numOutstandingReads > 0) {
673 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
674 assert canceled == numOutstandingReads;
675 }
676 }
677
678 @Override
679 protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
680 if (numOutstandingWrites > 0) {
681 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
682 assert canceled == numOutstandingWrites;
683 }
684 }
685
686 private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
687 int cancelled = 0;
688 for (int idx = 0; idx < array.length(); idx++) {
689 long id = array.id(idx);
690 if (id == MsgHdrMemoryArray.NO_ID) {
691 continue;
692 }
693
694
695 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
696 registration.submit(ops);
697 cancelled++;
698 }
699 return cancelled;
700 }
701
702 @Override
703 protected boolean socketIsEmpty(int flags) {
704 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
705 }
706
707 @Override
708 boolean isPollInFirst() {
709 return false;
710 }
711 }