1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.IoRegistration;
27 import io.netty.channel.socket.DatagramChannel;
28 import io.netty.channel.socket.DatagramChannelConfig;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.SocketProtocolFamily;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.Errors.NativeIoException;
33 import io.netty.channel.unix.SegmentedDatagramPacket;
34 import io.netty.channel.unix.Socket;
35 import io.netty.util.UncheckedBooleanSupplier;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.SystemPropertyUtil;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.Inet4Address;
44 import java.net.InetAddress;
45 import java.net.InetSocketAddress;
46 import java.net.NetworkInterface;
47 import java.net.PortUnreachableException;
48 import java.net.SocketAddress;
49 import java.nio.channels.UnresolvedAddressException;
50
51 import static io.netty.channel.unix.Errors.ioResult;
52
53 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringDatagramChannel.class);
55 private static final boolean IP_MULTICAST_ALL =
56 SystemPropertyUtil.getBoolean("io.netty.channel.iouring.ipMulticastAll", false);
57 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
58 private static final String EXPECTED_TYPES =
59 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61 StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63 StringUtil.simpleClassName(ByteBuf.class) + ')';
64
65 private final IoUringDatagramChannelConfig config;
66 private volatile boolean connected;
67
68 static {
69 if (logger.isDebugEnabled()) {
70 logger.debug("-Dio.netty.channel.iouring.ipMulticastAll: {}", IP_MULTICAST_ALL);
71 }
72 }
73
74
75
76
77
78
79
80 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
81 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
82 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
83
84
85
86
87
88 public IoUringDatagramChannel() {
89 this(null);
90 }
91
92
93
94
95
96 public IoUringDatagramChannel(SocketProtocolFamily family) {
97 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
98 }
99
100 private static boolean useIpv6(SocketProtocolFamily family) {
101 if (family == null) {
102 return Socket.isIPv6Preferred();
103 }
104 return family == SocketProtocolFamily.INET6;
105 }
106
107
108
109
110
111 public IoUringDatagramChannel(int fd) {
112 this(new LinuxSocket(fd), true);
113 }
114
115 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
116
117 super(null, fd, active);
118
119
120 try {
121 fd.setIpMulticastAll(IP_MULTICAST_ALL);
122 } catch (IOException e) {
123 logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
124 }
125
126 config = new IoUringDatagramChannelConfig(this);
127 }
128
129 @Override
130 public InetSocketAddress remoteAddress() {
131 return (InetSocketAddress) super.remoteAddress();
132 }
133
134 @Override
135 public InetSocketAddress localAddress() {
136 return (InetSocketAddress) super.localAddress();
137 }
138
139 @Override
140 public ChannelMetadata metadata() {
141 return METADATA;
142 }
143
144 @Override
145 public boolean isActive() {
146 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
147 }
148
149 @Override
150 public boolean isConnected() {
151 return connected;
152 }
153
154 @Override
155 public ChannelFuture joinGroup(InetAddress multicastAddress) {
156 return joinGroup(multicastAddress, newPromise());
157 }
158
159 @Override
160 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
161 try {
162 return joinGroup(
163 multicastAddress,
164 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
165 } catch (IOException e) {
166 promise.setFailure(e);
167 }
168 return promise;
169 }
170
171 @Override
172 public ChannelFuture joinGroup(
173 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
174 return joinGroup(multicastAddress, networkInterface, newPromise());
175 }
176
177 @Override
178 public ChannelFuture joinGroup(
179 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
180 ChannelPromise promise) {
181 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
182 }
183
184 @Override
185 public ChannelFuture joinGroup(
186 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
187 return joinGroup(multicastAddress, networkInterface, source, newPromise());
188 }
189
190 @Override
191 public ChannelFuture joinGroup(
192 final InetAddress multicastAddress, final NetworkInterface networkInterface,
193 final InetAddress source, final ChannelPromise promise) {
194
195 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
196 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
197
198 try {
199 socket.joinGroup(multicastAddress, networkInterface, source);
200 promise.setSuccess();
201 } catch (IOException e) {
202 promise.setFailure(e);
203 }
204 return promise;
205 }
206
207 @Override
208 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
209 return leaveGroup(multicastAddress, newPromise());
210 }
211
212 @Override
213 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
214 try {
215 return leaveGroup(
216 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
217 } catch (IOException e) {
218 promise.setFailure(e);
219 }
220 return promise;
221 }
222
223 @Override
224 public ChannelFuture leaveGroup(
225 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
226 return leaveGroup(multicastAddress, networkInterface, newPromise());
227 }
228
229 @Override
230 public ChannelFuture leaveGroup(
231 InetSocketAddress multicastAddress,
232 NetworkInterface networkInterface, ChannelPromise promise) {
233 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
234 }
235
236 @Override
237 public ChannelFuture leaveGroup(
238 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
239 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
240 }
241
242 @Override
243 public ChannelFuture leaveGroup(
244 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
245 final ChannelPromise promise) {
246 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
247 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
248
249 try {
250 socket.leaveGroup(multicastAddress, networkInterface, source);
251 promise.setSuccess();
252 } catch (IOException e) {
253 promise.setFailure(e);
254 }
255 return promise;
256 }
257
258 @Override
259 public ChannelFuture block(
260 InetAddress multicastAddress, NetworkInterface networkInterface,
261 InetAddress sourceToBlock) {
262 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
263 }
264
265 @Override
266 public ChannelFuture block(
267 final InetAddress multicastAddress, final NetworkInterface networkInterface,
268 final InetAddress sourceToBlock, final ChannelPromise promise) {
269 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
270 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
271 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
272
273 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
274 return promise;
275 }
276
277 @Override
278 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
279 return block(multicastAddress, sourceToBlock, newPromise());
280 }
281
282 @Override
283 public ChannelFuture block(
284 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
285 try {
286 return block(
287 multicastAddress,
288 NetworkInterface.getByInetAddress(localAddress().getAddress()),
289 sourceToBlock, promise);
290 } catch (Throwable e) {
291 promise.setFailure(e);
292 }
293 return promise;
294 }
295
296 @Override
297 protected AbstractUnsafe newUnsafe() {
298 return new IoUringDatagramChannelUnsafe();
299 }
300
301 @Override
302 protected void doBind(SocketAddress localAddress) throws Exception {
303 if (localAddress instanceof InetSocketAddress) {
304 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
305 if (socketAddress.getAddress().isAnyLocalAddress() &&
306 socketAddress.getAddress() instanceof Inet4Address) {
307 if (socket.family() == SocketProtocolFamily.INET6) {
308 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
309 }
310 }
311 }
312 super.doBind(localAddress);
313 active = true;
314 }
315
316 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
317 if (envelope.recipient() instanceof InetSocketAddress
318 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
319 throw new UnresolvedAddressException();
320 }
321 }
322
323 @Override
324 protected Object filterOutboundMessage(Object msg) {
325 if (msg instanceof DatagramPacket) {
326 DatagramPacket packet = (DatagramPacket) msg;
327 checkUnresolved(packet);
328 ByteBuf content = packet.content();
329 return !content.hasMemoryAddress() ?
330 packet.replace(newDirectBuffer(packet, content)) : msg;
331 }
332
333 if (msg instanceof ByteBuf) {
334 ByteBuf buf = (ByteBuf) msg;
335 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
336 }
337
338 if (msg instanceof AddressedEnvelope) {
339 @SuppressWarnings("unchecked")
340 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
341 checkUnresolved(e);
342 if (e.content() instanceof ByteBuf &&
343 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
344
345 ByteBuf content = (ByteBuf) e.content();
346 return !content.hasMemoryAddress()?
347 new DefaultAddressedEnvelope<>(
348 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
349 }
350 }
351
352 throw new UnsupportedOperationException(
353 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
354 }
355
356 @Override
357 public DatagramChannelConfig config() {
358 return config;
359 }
360
361 @Override
362 protected void doDisconnect() throws Exception {
363
364 socket.disconnect();
365 connected = active = false;
366
367 resetCachedAddresses();
368 }
369
370 @Override
371 protected void doClose() throws Exception {
372 super.doClose();
373 connected = false;
374 }
375
376 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
377 private final WriteProcessor writeProcessor = new WriteProcessor();
378
379 private ByteBuf readBuffer;
380
381 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
382 private int written;
383 @Override
384 public boolean processMessage(Object msg) {
385 if (scheduleWrite(msg, written == 0)) {
386 written++;
387 return true;
388 }
389 return false;
390 }
391
392 int write(ChannelOutboundBuffer in) {
393 written = 0;
394 try {
395 in.forEachFlushedMessage(this);
396 } catch (Exception e) {
397
398 throw new IllegalStateException(e);
399 }
400 return written;
401 }
402 }
403
404 @Override
405 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
406 assert outstanding != -1 : "multi-shot not implemented yet";
407
408 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
409 final ChannelPipeline pipeline = pipeline();
410 ByteBuf byteBuf = this.readBuffer;
411 assert byteBuf != null;
412 try {
413 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
414 } catch (Throwable t) {
415 Throwable e = (connected && t instanceof NativeIoException) ?
416 translateForConnected((NativeIoException) t) : t;
417 pipeline.fireExceptionCaught(e);
418 }
419 }
420
421 private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
422 ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
423 throws IOException {
424 MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
425 if (res < 0) {
426 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
427
428
429 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
430 }
431 } else {
432 allocHandle.lastBytesRead(res);
433 if (hdr.hasPort(IoUringDatagramChannel.this)) {
434 allocHandle.incMessagesRead(1);
435 DatagramPacket packet = hdr.get(
436 IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
437 pipeline.fireChannelRead(packet);
438 }
439 }
440
441
442 recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
443 if (outstanding == 0) {
444
445
446 this.readBuffer.release();
447 this.readBuffer = null;
448 recvmsgHdrs.clear();
449
450 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
451 if (allocHandle.lastBytesRead() > 0 &&
452 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
453
454
455
456
457 (!IoUring.isCqeFSockNonEmptySupported() ||
458 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
459
460 scheduleRead(false);
461 } else {
462
463 allocHandle.readComplete();
464 pipeline.fireChannelReadComplete();
465 }
466 }
467 }
468 }
469
470 @Override
471 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
472 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
473 ByteBuf byteBuf = allocHandle.allocate(alloc());
474 assert readBuffer == null;
475 readBuffer = byteBuf;
476
477 int writable = byteBuf.writableBytes();
478 allocHandle.attemptedBytesRead(writable);
479 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
480
481 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
482
483 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
484 if (scheduled == 0) {
485
486
487 readBuffer = null;
488 byteBuf.release();
489 }
490 return scheduled;
491 }
492
493 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
494 int writable = byteBuf.writableBytes();
495 long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
496 if (numDatagram <= 1) {
497 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
498 }
499 int i = 0;
500
501 for (; i < numDatagram && writable >= datagramSize; i++) {
502 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
503 break;
504 }
505 bufferAddress += datagramSize;
506 writable -= datagramSize;
507 }
508 return i;
509 }
510
511 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
512 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
513 if (msgHdrMemory == null) {
514
515 return false;
516 }
517 msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
518
519 int fd = fd().intValue();
520 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
521 IoRegistration registration = registration();
522
523
524 IoUringIoOps ops = IoUringIoOps.newRecvmsg(
525 fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
526 long id = registration.submit(ops);
527 if (id == 0) {
528
529 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
530 return false;
531 }
532 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
533 return true;
534 }
535
536 @Override
537 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
538 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
539
540
541 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
542 sendmsgResArray[data] = res;
543
544 if (outstanding == 0) {
545
546 boolean writtenSomething = false;
547 int numWritten = sendmsgHdrs.length();
548 sendmsgHdrs.clear();
549 for (int i = 0; i < numWritten; i++) {
550 writtenSomething |= removeFromOutboundBuffer(
551 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
552 }
553 return writtenSomething;
554 }
555 return true;
556 }
557
558 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
559 if (res >= 0) {
560
561 return outboundBuffer.remove();
562 }
563 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
564 return false;
565 }
566 try {
567 return ioResult(errormsg, res) != 0;
568 } catch (Throwable cause) {
569 return outboundBuffer.remove(cause);
570 }
571 }
572
573 @Override
574 void connectComplete(byte op, int res, int flags, short data) {
575 if (res >= 0) {
576 connected = true;
577 }
578 super.connectComplete(op, res, flags, data);
579 }
580
581 @Override
582 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
583 return writeProcessor.write(in);
584 }
585
586 @Override
587 protected int scheduleWriteSingle(Object msg) {
588 return scheduleWrite(msg, true) ? 1 : 0;
589 }
590
591 private boolean scheduleWrite(Object msg, boolean first) {
592 final ByteBuf data;
593 final InetSocketAddress remoteAddress;
594 final int segmentSize;
595 if (msg instanceof AddressedEnvelope) {
596 @SuppressWarnings("unchecked")
597 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
598 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
599 data = envelope.content();
600 remoteAddress = envelope.recipient();
601 if (msg instanceof SegmentedDatagramPacket) {
602 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
603 } else {
604 segmentSize = 0;
605 }
606 } else {
607 data = (ByteBuf) msg;
608 remoteAddress = (InetSocketAddress) remoteAddress();
609 segmentSize = 0;
610 }
611
612 long bufferAddress = IoUring.memoryAddress(data);
613 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
614 }
615
616 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
617 int bufferLength, int segmentSize, boolean first) {
618 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
619 if (hdr == null) {
620
621
622 return false;
623 }
624 hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
625
626 int fd = fd().intValue();
627 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
628 IoRegistration registration = registration();
629 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
630 long id = registration.submit(ops);
631 if (id == 0) {
632
633 sendmsgHdrs.restoreNextHdr(hdr);
634 return false;
635 }
636 sendmsgHdrs.setId(hdr.idx(), id);
637 return true;
638 }
639
640 @Override
641 public void unregistered() {
642 super.unregistered();
643 sendmsgHdrs.release();
644 recvmsgHdrs.release();
645 }
646 }
647
648 private static IOException translateForConnected(NativeIoException e) {
649
650 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
651 PortUnreachableException error = new PortUnreachableException(e.getMessage());
652 error.initCause(e);
653 return error;
654 }
655 return e;
656 }
657
658
659
660
661
662
663 public static boolean isSegmentedDatagramPacketSupported() {
664 return IoUring.isAvailable();
665 }
666
667 @Override
668 protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
669 if (numOutstandingReads > 0) {
670 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
671 assert canceled == numOutstandingReads;
672 }
673 }
674
675 @Override
676 protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
677 if (numOutstandingWrites > 0) {
678 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
679 assert canceled == numOutstandingWrites;
680 }
681 }
682
683 private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
684 int cancelled = 0;
685 for (int idx = 0; idx < array.length(); idx++) {
686 long id = array.id(idx);
687 if (id == MsgHdrMemoryArray.NO_ID) {
688 continue;
689 }
690
691
692 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
693 registration.submit(ops);
694 cancelled++;
695 }
696 return cancelled;
697 }
698
699 @Override
700 protected boolean socketIsEmpty(int flags) {
701 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
702 }
703
704 @Override
705 boolean isPollInFirst() {
706 return false;
707 }
708 }