1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.IoRegistration;
27 import io.netty.channel.socket.DatagramChannel;
28 import io.netty.channel.socket.DatagramChannelConfig;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.SocketProtocolFamily;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.Errors.NativeIoException;
33 import io.netty.channel.unix.SegmentedDatagramPacket;
34 import io.netty.channel.unix.Socket;
35 import io.netty.util.UncheckedBooleanSupplier;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.StringUtil;
38
39 import java.io.IOException;
40 import java.net.Inet4Address;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.NetworkInterface;
44 import java.net.PortUnreachableException;
45 import java.net.SocketAddress;
46 import java.nio.channels.UnresolvedAddressException;
47
48 import static io.netty.channel.unix.Errors.ioResult;
49
50 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
51 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
52 private static final String EXPECTED_TYPES =
53 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
54 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
55 StringUtil.simpleClassName(ByteBuf.class) + ", " +
56 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
57 StringUtil.simpleClassName(ByteBuf.class) + ')';
58
59 private final IoUringDatagramChannelConfig config;
60 private volatile boolean connected;
61
62
63
64
65
66
67
68 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
69 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
70 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
71
72
73
74
75
76 public IoUringDatagramChannel() {
77 this(null);
78 }
79
80
81
82
83
84 public IoUringDatagramChannel(SocketProtocolFamily family) {
85 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
86 }
87
88 private static boolean useIpv6(SocketProtocolFamily family) {
89 if (family == null) {
90 return Socket.isIPv6Preferred();
91 }
92 return family == SocketProtocolFamily.INET6;
93 }
94
95
96
97
98
99 public IoUringDatagramChannel(int fd) {
100 this(new LinuxSocket(fd), true);
101 }
102
103 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
104
105 super(null, fd, active);
106 config = new IoUringDatagramChannelConfig(this);
107 }
108
109 @Override
110 public InetSocketAddress remoteAddress() {
111 return (InetSocketAddress) super.remoteAddress();
112 }
113
114 @Override
115 public InetSocketAddress localAddress() {
116 return (InetSocketAddress) super.localAddress();
117 }
118
119 @Override
120 public ChannelMetadata metadata() {
121 return METADATA;
122 }
123
124 @Override
125 public boolean isActive() {
126 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
127 }
128
129 @Override
130 public boolean isConnected() {
131 return connected;
132 }
133
134 @Override
135 public ChannelFuture joinGroup(InetAddress multicastAddress) {
136 return joinGroup(multicastAddress, newPromise());
137 }
138
139 @Override
140 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
141 try {
142 return joinGroup(
143 multicastAddress,
144 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
145 } catch (IOException e) {
146 promise.setFailure(e);
147 }
148 return promise;
149 }
150
151 @Override
152 public ChannelFuture joinGroup(
153 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
154 return joinGroup(multicastAddress, networkInterface, newPromise());
155 }
156
157 @Override
158 public ChannelFuture joinGroup(
159 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
160 ChannelPromise promise) {
161 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
162 }
163
164 @Override
165 public ChannelFuture joinGroup(
166 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
167 return joinGroup(multicastAddress, networkInterface, source, newPromise());
168 }
169
170 @Override
171 public ChannelFuture joinGroup(
172 final InetAddress multicastAddress, final NetworkInterface networkInterface,
173 final InetAddress source, final ChannelPromise promise) {
174
175 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
176 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
177
178 try {
179 socket.joinGroup(multicastAddress, networkInterface, source);
180 promise.setSuccess();
181 } catch (IOException e) {
182 promise.setFailure(e);
183 }
184 return promise;
185 }
186
187 @Override
188 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
189 return leaveGroup(multicastAddress, newPromise());
190 }
191
192 @Override
193 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
194 try {
195 return leaveGroup(
196 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
197 } catch (IOException e) {
198 promise.setFailure(e);
199 }
200 return promise;
201 }
202
203 @Override
204 public ChannelFuture leaveGroup(
205 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
206 return leaveGroup(multicastAddress, networkInterface, newPromise());
207 }
208
209 @Override
210 public ChannelFuture leaveGroup(
211 InetSocketAddress multicastAddress,
212 NetworkInterface networkInterface, ChannelPromise promise) {
213 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
214 }
215
216 @Override
217 public ChannelFuture leaveGroup(
218 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
219 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
220 }
221
222 @Override
223 public ChannelFuture leaveGroup(
224 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
225 final ChannelPromise promise) {
226 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
227 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
228
229 try {
230 socket.leaveGroup(multicastAddress, networkInterface, source);
231 promise.setSuccess();
232 } catch (IOException e) {
233 promise.setFailure(e);
234 }
235 return promise;
236 }
237
238 @Override
239 public ChannelFuture block(
240 InetAddress multicastAddress, NetworkInterface networkInterface,
241 InetAddress sourceToBlock) {
242 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
243 }
244
245 @Override
246 public ChannelFuture block(
247 final InetAddress multicastAddress, final NetworkInterface networkInterface,
248 final InetAddress sourceToBlock, final ChannelPromise promise) {
249 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
250 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
251 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
252
253 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
254 return promise;
255 }
256
257 @Override
258 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
259 return block(multicastAddress, sourceToBlock, newPromise());
260 }
261
262 @Override
263 public ChannelFuture block(
264 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
265 try {
266 return block(
267 multicastAddress,
268 NetworkInterface.getByInetAddress(localAddress().getAddress()),
269 sourceToBlock, promise);
270 } catch (Throwable e) {
271 promise.setFailure(e);
272 }
273 return promise;
274 }
275
276 @Override
277 protected AbstractUnsafe newUnsafe() {
278 return new IoUringDatagramChannelUnsafe();
279 }
280
281 @Override
282 protected void doBind(SocketAddress localAddress) throws Exception {
283 if (localAddress instanceof InetSocketAddress) {
284 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
285 if (socketAddress.getAddress().isAnyLocalAddress() &&
286 socketAddress.getAddress() instanceof Inet4Address) {
287 if (socket.family() == SocketProtocolFamily.INET6) {
288 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
289 }
290 }
291 }
292 super.doBind(localAddress);
293 active = true;
294 }
295
296 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
297 if (envelope.recipient() instanceof InetSocketAddress
298 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
299 throw new UnresolvedAddressException();
300 }
301 }
302
303 @Override
304 protected Object filterOutboundMessage(Object msg) {
305 if (msg instanceof DatagramPacket) {
306 DatagramPacket packet = (DatagramPacket) msg;
307 checkUnresolved(packet);
308 ByteBuf content = packet.content();
309 return !content.hasMemoryAddress() ?
310 packet.replace(newDirectBuffer(packet, content)) : msg;
311 }
312
313 if (msg instanceof ByteBuf) {
314 ByteBuf buf = (ByteBuf) msg;
315 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
316 }
317
318 if (msg instanceof AddressedEnvelope) {
319 @SuppressWarnings("unchecked")
320 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
321 checkUnresolved(e);
322 if (e.content() instanceof ByteBuf &&
323 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
324
325 ByteBuf content = (ByteBuf) e.content();
326 return !content.hasMemoryAddress()?
327 new DefaultAddressedEnvelope<>(
328 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
329 }
330 }
331
332 throw new UnsupportedOperationException(
333 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
334 }
335
336 @Override
337 public DatagramChannelConfig config() {
338 return config;
339 }
340
341 @Override
342 protected void doDisconnect() throws Exception {
343
344 socket.disconnect();
345 connected = active = false;
346
347 resetCachedAddresses();
348 }
349
350 @Override
351 protected void doClose() throws Exception {
352 super.doClose();
353 connected = false;
354 }
355
356 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
357 private final WriteProcessor writeProcessor = new WriteProcessor();
358
359 private ByteBuf readBuffer;
360
361 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
362 private int written;
363 @Override
364 public boolean processMessage(Object msg) {
365 if (scheduleWrite(msg, written == 0)) {
366 written++;
367 return true;
368 }
369 return false;
370 }
371
372 int write(ChannelOutboundBuffer in) {
373 written = 0;
374 try {
375 in.forEachFlushedMessage(this);
376 } catch (Exception e) {
377
378 throw new IllegalStateException(e);
379 }
380 return written;
381 }
382 }
383
384 @Override
385 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
386 assert outstanding != -1 : "multi-shot not implemented yet";
387
388 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
389 final ChannelPipeline pipeline = pipeline();
390 ByteBuf byteBuf = this.readBuffer;
391 assert byteBuf != null;
392 try {
393 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
394 } catch (Throwable t) {
395 if (connected && t instanceof NativeIoException) {
396 t = translateForConnected((NativeIoException) t);
397 }
398 pipeline.fireExceptionCaught(t);
399 }
400 }
401
402 private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
403 ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
404 throws IOException {
405 MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
406 if (res < 0) {
407 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
408
409
410 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
411 }
412 } else {
413 allocHandle.lastBytesRead(res);
414 if (hdr.hasPort(IoUringDatagramChannel.this)) {
415 allocHandle.incMessagesRead(1);
416 DatagramPacket packet = hdr.read(
417 IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
418 pipeline.fireChannelRead(packet);
419 }
420 }
421
422
423 recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
424 if (outstanding == 0) {
425
426
427 this.readBuffer.release();
428 this.readBuffer = null;
429 recvmsgHdrs.clear();
430
431 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
432 if (allocHandle.lastBytesRead() > 0 &&
433 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
434
435
436
437
438 (!IoUring.isCqeFSockNonEmptySupported() ||
439 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
440
441 scheduleRead(false);
442 } else {
443
444 allocHandle.readComplete();
445 pipeline.fireChannelReadComplete();
446 }
447 }
448 }
449 }
450
451 @Override
452 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
453 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
454 ByteBuf byteBuf = allocHandle.allocate(alloc());
455 assert readBuffer == null;
456 readBuffer = byteBuf;
457
458 int writable = byteBuf.writableBytes();
459 allocHandle.attemptedBytesRead(writable);
460 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
461
462 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
463
464 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
465 if (scheduled == 0) {
466
467
468 readBuffer = null;
469 byteBuf.release();
470 }
471 return scheduled;
472 }
473
474 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
475 int writable = byteBuf.writableBytes();
476 long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
477 if (numDatagram <= 1) {
478 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
479 }
480 int i = 0;
481
482 for (; i < numDatagram && writable >= datagramSize; i++) {
483 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
484 break;
485 }
486 bufferAddress += datagramSize;
487 writable -= datagramSize;
488 }
489 return i;
490 }
491
492 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
493 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
494 if (msgHdrMemory == null) {
495
496 return false;
497 }
498 msgHdrMemory.write(socket, null, bufferAddress, bufferLength, (short) 0);
499
500 int fd = fd().intValue();
501 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
502 IoRegistration registration = registration();
503
504
505 IoUringIoOps ops = IoUringIoOps.newRecvmsg(
506 fd, flags((byte) 0), msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
507 long id = registration.submit(ops);
508 if (id == 0) {
509
510 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
511 return false;
512 }
513 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
514 return true;
515 }
516
517 @Override
518 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
519 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
520
521
522 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
523 sendmsgResArray[data] = res;
524
525 if (outstanding == 0) {
526
527 boolean writtenSomething = false;
528 int numWritten = sendmsgHdrs.length();
529 sendmsgHdrs.clear();
530 for (int i = 0; i < numWritten; i++) {
531 writtenSomething |= removeFromOutboundBuffer(
532 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
533 }
534 return writtenSomething;
535 }
536 return true;
537 }
538
539 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
540 if (res >= 0) {
541
542 return outboundBuffer.remove();
543 }
544 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
545 return false;
546 }
547 try {
548 return ioResult(errormsg, res) != 0;
549 } catch (Throwable cause) {
550 return outboundBuffer.remove(cause);
551 }
552 }
553
554 @Override
555 void connectComplete(byte op, int res, int flags, short data) {
556 if (res >= 0) {
557 connected = true;
558 }
559 super.connectComplete(op, res, flags, data);
560 }
561
562 @Override
563 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
564 return writeProcessor.write(in);
565 }
566
567 @Override
568 protected int scheduleWriteSingle(Object msg) {
569 return scheduleWrite(msg, true) ? 1 : 0;
570 }
571
572 private boolean scheduleWrite(Object msg, boolean first) {
573 final ByteBuf data;
574 final InetSocketAddress remoteAddress;
575 final int segmentSize;
576 if (msg instanceof AddressedEnvelope) {
577 @SuppressWarnings("unchecked")
578 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
579 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
580 data = envelope.content();
581 remoteAddress = envelope.recipient();
582 if (msg instanceof SegmentedDatagramPacket) {
583 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
584 } else {
585 segmentSize = 0;
586 }
587 } else {
588 data = (ByteBuf) msg;
589 remoteAddress = (InetSocketAddress) remoteAddress();
590 segmentSize = 0;
591 }
592
593 long bufferAddress = data.memoryAddress();
594 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
595 }
596
597 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
598 int bufferLength, int segmentSize, boolean first) {
599 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
600 if (hdr == null) {
601
602
603 return false;
604 }
605 hdr.write(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
606
607 int fd = fd().intValue();
608 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
609 IoRegistration registration = registration();
610 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), msgFlags, hdr.address(), hdr.idx());
611 long id = registration.submit(ops);
612 if (id == 0) {
613
614 sendmsgHdrs.restoreNextHdr(hdr);
615 return false;
616 }
617 sendmsgHdrs.setId(hdr.idx(), id);
618 return true;
619 }
620
621 @Override
622 protected void freeResourcesNow(IoRegistration reg) {
623 sendmsgHdrs.release();
624 recvmsgHdrs.release();
625 super.freeResourcesNow(reg);
626 }
627 }
628
629 private static IOException translateForConnected(NativeIoException e) {
630
631 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
632 PortUnreachableException error = new PortUnreachableException(e.getMessage());
633 error.initCause(e);
634 return error;
635 }
636 return e;
637 }
638
639
640
641
642
643
644 public static boolean isSegmentedDatagramPacketSupported() {
645 return IoUring.isAvailable();
646 }
647
648 @Override
649 protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
650 if (numOutstandingReads > 0) {
651 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
652 assert canceled == numOutstandingReads;
653 }
654 }
655
656 @Override
657 protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
658 if (numOutstandingWrites > 0) {
659 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
660 assert canceled == numOutstandingWrites;
661 }
662 }
663
664 private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
665 int cancelled = 0;
666 for (int idx = 0; idx < array.length(); idx++) {
667 long id = array.id(idx);
668 if (id == MsgHdrMemoryArray.NO_ID) {
669 continue;
670 }
671
672
673 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(flags((byte) 0), id, op);
674 registration.submit(ops);
675 cancelled++;
676 }
677 return cancelled;
678 }
679
680 @Override
681 protected boolean socketIsEmpty(int flags) {
682 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
683 }
684
685 @Override
686 boolean isPollInFirst() {
687 return false;
688 }
689 }