1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.IoRegistration;
27 import io.netty.channel.socket.DatagramChannel;
28 import io.netty.channel.socket.DatagramChannelConfig;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.SocketProtocolFamily;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.Errors.NativeIoException;
33 import io.netty.channel.unix.SegmentedDatagramPacket;
34 import io.netty.channel.unix.Socket;
35 import io.netty.util.UncheckedBooleanSupplier;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.StringUtil;
38
39 import java.io.IOException;
40 import java.net.Inet4Address;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.NetworkInterface;
44 import java.net.PortUnreachableException;
45 import java.net.SocketAddress;
46 import java.nio.channels.UnresolvedAddressException;
47
48 import static io.netty.channel.unix.Errors.ioResult;
49
50 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
51 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
52 private static final String EXPECTED_TYPES =
53 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
54 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
55 StringUtil.simpleClassName(ByteBuf.class) + ", " +
56 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
57 StringUtil.simpleClassName(ByteBuf.class) + ')';
58
59 private final IoUringDatagramChannelConfig config;
60 private volatile boolean connected;
61
62
63
64
65
66
67
68 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
69 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
70 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
71
72
73
74
75
76 public IoUringDatagramChannel() {
77 this(null);
78 }
79
80
81
82
83
84 public IoUringDatagramChannel(SocketProtocolFamily family) {
85 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
86 }
87
88 private static boolean useIpv6(SocketProtocolFamily family) {
89 if (family == null) {
90 return Socket.isIPv6Preferred();
91 }
92 return family == SocketProtocolFamily.INET6;
93 }
94
95
96
97
98
99 public IoUringDatagramChannel(int fd) {
100 this(new LinuxSocket(fd), true);
101 }
102
103 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
104
105 super(null, fd, active);
106 config = new IoUringDatagramChannelConfig(this);
107 }
108
109 @Override
110 public InetSocketAddress remoteAddress() {
111 return (InetSocketAddress) super.remoteAddress();
112 }
113
114 @Override
115 public InetSocketAddress localAddress() {
116 return (InetSocketAddress) super.localAddress();
117 }
118
119 @Override
120 public ChannelMetadata metadata() {
121 return METADATA;
122 }
123
124 @Override
125 public boolean isActive() {
126 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
127 }
128
129 @Override
130 public boolean isConnected() {
131 return connected;
132 }
133
134 @Override
135 public ChannelFuture joinGroup(InetAddress multicastAddress) {
136 return joinGroup(multicastAddress, newPromise());
137 }
138
139 @Override
140 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
141 try {
142 return joinGroup(
143 multicastAddress,
144 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
145 } catch (IOException e) {
146 promise.setFailure(e);
147 }
148 return promise;
149 }
150
151 @Override
152 public ChannelFuture joinGroup(
153 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
154 return joinGroup(multicastAddress, networkInterface, newPromise());
155 }
156
157 @Override
158 public ChannelFuture joinGroup(
159 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
160 ChannelPromise promise) {
161 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
162 }
163
164 @Override
165 public ChannelFuture joinGroup(
166 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
167 return joinGroup(multicastAddress, networkInterface, source, newPromise());
168 }
169
170 @Override
171 public ChannelFuture joinGroup(
172 final InetAddress multicastAddress, final NetworkInterface networkInterface,
173 final InetAddress source, final ChannelPromise promise) {
174
175 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
176 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
177
178 try {
179 socket.joinGroup(multicastAddress, networkInterface, source);
180 promise.setSuccess();
181 } catch (IOException e) {
182 promise.setFailure(e);
183 }
184 return promise;
185 }
186
187 @Override
188 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
189 return leaveGroup(multicastAddress, newPromise());
190 }
191
192 @Override
193 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
194 try {
195 return leaveGroup(
196 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
197 } catch (IOException e) {
198 promise.setFailure(e);
199 }
200 return promise;
201 }
202
203 @Override
204 public ChannelFuture leaveGroup(
205 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
206 return leaveGroup(multicastAddress, networkInterface, newPromise());
207 }
208
209 @Override
210 public ChannelFuture leaveGroup(
211 InetSocketAddress multicastAddress,
212 NetworkInterface networkInterface, ChannelPromise promise) {
213 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
214 }
215
216 @Override
217 public ChannelFuture leaveGroup(
218 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
219 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
220 }
221
222 @Override
223 public ChannelFuture leaveGroup(
224 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
225 final ChannelPromise promise) {
226 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
227 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
228
229 try {
230 socket.leaveGroup(multicastAddress, networkInterface, source);
231 promise.setSuccess();
232 } catch (IOException e) {
233 promise.setFailure(e);
234 }
235 return promise;
236 }
237
238 @Override
239 public ChannelFuture block(
240 InetAddress multicastAddress, NetworkInterface networkInterface,
241 InetAddress sourceToBlock) {
242 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
243 }
244
245 @Override
246 public ChannelFuture block(
247 final InetAddress multicastAddress, final NetworkInterface networkInterface,
248 final InetAddress sourceToBlock, final ChannelPromise promise) {
249 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
250 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
251 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
252
253 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
254 return promise;
255 }
256
257 @Override
258 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
259 return block(multicastAddress, sourceToBlock, newPromise());
260 }
261
262 @Override
263 public ChannelFuture block(
264 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
265 try {
266 return block(
267 multicastAddress,
268 NetworkInterface.getByInetAddress(localAddress().getAddress()),
269 sourceToBlock, promise);
270 } catch (Throwable e) {
271 promise.setFailure(e);
272 }
273 return promise;
274 }
275
276 @Override
277 protected AbstractUnsafe newUnsafe() {
278 return new IoUringDatagramChannelUnsafe();
279 }
280
281 @Override
282 protected void doBind(SocketAddress localAddress) throws Exception {
283 if (localAddress instanceof InetSocketAddress) {
284 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
285 if (socketAddress.getAddress().isAnyLocalAddress() &&
286 socketAddress.getAddress() instanceof Inet4Address) {
287 if (socket.family() == SocketProtocolFamily.INET6) {
288 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
289 }
290 }
291 }
292 super.doBind(localAddress);
293 active = true;
294 }
295
296 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
297 if (envelope.recipient() instanceof InetSocketAddress
298 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
299 throw new UnresolvedAddressException();
300 }
301 }
302
303 @Override
304 protected Object filterOutboundMessage(Object msg) {
305 if (msg instanceof DatagramPacket) {
306 DatagramPacket packet = (DatagramPacket) msg;
307 checkUnresolved(packet);
308 ByteBuf content = packet.content();
309 return !content.hasMemoryAddress() ?
310 packet.replace(newDirectBuffer(packet, content)) : msg;
311 }
312
313 if (msg instanceof ByteBuf) {
314 ByteBuf buf = (ByteBuf) msg;
315 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
316 }
317
318 if (msg instanceof AddressedEnvelope) {
319 @SuppressWarnings("unchecked")
320 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
321 checkUnresolved(e);
322 if (e.content() instanceof ByteBuf &&
323 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
324
325 ByteBuf content = (ByteBuf) e.content();
326 return !content.hasMemoryAddress()?
327 new DefaultAddressedEnvelope<>(
328 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
329 }
330 }
331
332 throw new UnsupportedOperationException(
333 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
334 }
335
336 @Override
337 public DatagramChannelConfig config() {
338 return config;
339 }
340
341 @Override
342 protected void doDisconnect() throws Exception {
343
344 socket.disconnect();
345 connected = active = false;
346
347 resetCachedAddresses();
348 }
349
350 @Override
351 protected void doClose() throws Exception {
352 super.doClose();
353 connected = false;
354 }
355
356 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
357 private final WriteProcessor writeProcessor = new WriteProcessor();
358
359 private ByteBuf readBuffer;
360
361 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
362 private int written;
363 @Override
364 public boolean processMessage(Object msg) {
365 if (scheduleWrite(msg, written == 0)) {
366 written++;
367 return true;
368 }
369 return false;
370 }
371
372 int write(ChannelOutboundBuffer in) {
373 written = 0;
374 try {
375 in.forEachFlushedMessage(this);
376 } catch (Exception e) {
377
378 throw new IllegalStateException(e);
379 }
380 return written;
381 }
382 }
383
384 @Override
385 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
386 assert outstanding != -1 : "multi-shot not implemented yet";
387
388 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
389 final ChannelPipeline pipeline = pipeline();
390 ByteBuf byteBuf = this.readBuffer;
391 assert byteBuf != null;
392 try {
393 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
394 } catch (Throwable t) {
395 Throwable e = (connected && t instanceof NativeIoException) ?
396 translateForConnected((NativeIoException) t) : t;
397 pipeline.fireExceptionCaught(e);
398 }
399 }
400
401 private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
402 ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
403 throws IOException {
404 MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
405 if (res < 0) {
406 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
407
408
409 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
410 }
411 } else {
412 allocHandle.lastBytesRead(res);
413 if (hdr.hasPort(IoUringDatagramChannel.this)) {
414 allocHandle.incMessagesRead(1);
415 DatagramPacket packet = hdr.get(
416 IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
417 pipeline.fireChannelRead(packet);
418 }
419 }
420
421
422 recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
423 if (outstanding == 0) {
424
425
426 this.readBuffer.release();
427 this.readBuffer = null;
428 recvmsgHdrs.clear();
429
430 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
431 if (allocHandle.lastBytesRead() > 0 &&
432 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
433
434
435
436
437 (!IoUring.isCqeFSockNonEmptySupported() ||
438 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
439
440 scheduleRead(false);
441 } else {
442
443 allocHandle.readComplete();
444 pipeline.fireChannelReadComplete();
445 }
446 }
447 }
448 }
449
450 @Override
451 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
452 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
453 ByteBuf byteBuf = allocHandle.allocate(alloc());
454 assert readBuffer == null;
455 readBuffer = byteBuf;
456
457 int writable = byteBuf.writableBytes();
458 allocHandle.attemptedBytesRead(writable);
459 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
460
461 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
462
463 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
464 if (scheduled == 0) {
465
466
467 readBuffer = null;
468 byteBuf.release();
469 }
470 return scheduled;
471 }
472
473 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
474 int writable = byteBuf.writableBytes();
475 long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
476 if (numDatagram <= 1) {
477 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
478 }
479 int i = 0;
480
481 for (; i < numDatagram && writable >= datagramSize; i++) {
482 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
483 break;
484 }
485 bufferAddress += datagramSize;
486 writable -= datagramSize;
487 }
488 return i;
489 }
490
491 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
492 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
493 if (msgHdrMemory == null) {
494
495 return false;
496 }
497 msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
498
499 int fd = fd().intValue();
500 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
501 IoRegistration registration = registration();
502
503
504 IoUringIoOps ops = IoUringIoOps.newRecvmsg(
505 fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
506 long id = registration.submit(ops);
507 if (id == 0) {
508
509 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
510 return false;
511 }
512 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
513 return true;
514 }
515
516 @Override
517 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
518 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
519
520
521 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
522 sendmsgResArray[data] = res;
523
524 if (outstanding == 0) {
525
526 boolean writtenSomething = false;
527 int numWritten = sendmsgHdrs.length();
528 sendmsgHdrs.clear();
529 for (int i = 0; i < numWritten; i++) {
530 writtenSomething |= removeFromOutboundBuffer(
531 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
532 }
533 return writtenSomething;
534 }
535 return true;
536 }
537
538 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
539 if (res >= 0) {
540
541 return outboundBuffer.remove();
542 }
543 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
544 return false;
545 }
546 try {
547 return ioResult(errormsg, res) != 0;
548 } catch (Throwable cause) {
549 return outboundBuffer.remove(cause);
550 }
551 }
552
553 @Override
554 void connectComplete(byte op, int res, int flags, short data) {
555 if (res >= 0) {
556 connected = true;
557 }
558 super.connectComplete(op, res, flags, data);
559 }
560
561 @Override
562 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
563 return writeProcessor.write(in);
564 }
565
566 @Override
567 protected int scheduleWriteSingle(Object msg) {
568 return scheduleWrite(msg, true) ? 1 : 0;
569 }
570
571 private boolean scheduleWrite(Object msg, boolean first) {
572 final ByteBuf data;
573 final InetSocketAddress remoteAddress;
574 final int segmentSize;
575 if (msg instanceof AddressedEnvelope) {
576 @SuppressWarnings("unchecked")
577 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
578 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
579 data = envelope.content();
580 remoteAddress = envelope.recipient();
581 if (msg instanceof SegmentedDatagramPacket) {
582 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
583 } else {
584 segmentSize = 0;
585 }
586 } else {
587 data = (ByteBuf) msg;
588 remoteAddress = (InetSocketAddress) remoteAddress();
589 segmentSize = 0;
590 }
591
592 long bufferAddress = IoUring.memoryAddress(data);
593 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
594 }
595
596 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
597 int bufferLength, int segmentSize, boolean first) {
598 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
599 if (hdr == null) {
600
601
602 return false;
603 }
604 hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
605
606 int fd = fd().intValue();
607 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
608 IoRegistration registration = registration();
609 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
610 long id = registration.submit(ops);
611 if (id == 0) {
612
613 sendmsgHdrs.restoreNextHdr(hdr);
614 return false;
615 }
616 sendmsgHdrs.setId(hdr.idx(), id);
617 return true;
618 }
619
620 @Override
621 protected void freeResourcesNow(IoRegistration reg) {
622 sendmsgHdrs.release();
623 recvmsgHdrs.release();
624 super.freeResourcesNow(reg);
625 }
626 }
627
628 private static IOException translateForConnected(NativeIoException e) {
629
630 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
631 PortUnreachableException error = new PortUnreachableException(e.getMessage());
632 error.initCause(e);
633 return error;
634 }
635 return e;
636 }
637
638
639
640
641
642
643 public static boolean isSegmentedDatagramPacketSupported() {
644 return IoUring.isAvailable();
645 }
646
647 @Override
648 protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
649 if (numOutstandingReads > 0) {
650 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
651 assert canceled == numOutstandingReads;
652 }
653 }
654
655 @Override
656 protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
657 if (numOutstandingWrites > 0) {
658 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
659 assert canceled == numOutstandingWrites;
660 }
661 }
662
663 private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
664 int cancelled = 0;
665 for (int idx = 0; idx < array.length(); idx++) {
666 long id = array.id(idx);
667 if (id == MsgHdrMemoryArray.NO_ID) {
668 continue;
669 }
670
671
672 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
673 registration.submit(ops);
674 cancelled++;
675 }
676 return cancelled;
677 }
678
679 @Override
680 protected boolean socketIsEmpty(int flags) {
681 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
682 }
683
684 @Override
685 boolean isPollInFirst() {
686 return false;
687 }
688 }