1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.socket.DatagramChannel;
27 import io.netty.channel.socket.DatagramChannelConfig;
28 import io.netty.channel.socket.DatagramPacket;
29 import io.netty.channel.socket.SocketProtocolFamily;
30 import io.netty.channel.unix.Errors;
31 import io.netty.channel.unix.Errors.NativeIoException;
32 import io.netty.channel.unix.SegmentedDatagramPacket;
33 import io.netty.channel.unix.Socket;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.ObjectUtil;
36 import io.netty.util.internal.StringUtil;
37
38 import java.io.IOException;
39 import java.net.Inet4Address;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.NetworkInterface;
43 import java.net.PortUnreachableException;
44 import java.net.SocketAddress;
45 import java.nio.channels.UnresolvedAddressException;
46
47 import static io.netty.channel.unix.Errors.ioResult;
48
49 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
50 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
51 private static final String EXPECTED_TYPES =
52 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54 StringUtil.simpleClassName(ByteBuf.class) + ", " +
55 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56 StringUtil.simpleClassName(ByteBuf.class) + ')';
57
58 private final IoUringDatagramChannelConfig config;
59 private volatile boolean connected;
60
61
62
63
64
65
66
67 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
68 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
69 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
70
71
72
73
74
75 public IoUringDatagramChannel() {
76 this(null);
77 }
78
79
80
81
82
83 public IoUringDatagramChannel(SocketProtocolFamily family) {
84 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
85 }
86
87 private static boolean useIpv6(SocketProtocolFamily family) {
88 if (family == null) {
89 return Socket.isIPv6Preferred();
90 }
91 return family == SocketProtocolFamily.INET6;
92 }
93
94
95
96
97
98 public IoUringDatagramChannel(int fd) {
99 this(new LinuxSocket(fd), true);
100 }
101
102 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
103
104 super(null, LinuxSocket.makeBlocking(fd), active);
105 config = new IoUringDatagramChannelConfig(this);
106 }
107
108 @Override
109 public InetSocketAddress remoteAddress() {
110 return (InetSocketAddress) super.remoteAddress();
111 }
112
113 @Override
114 public InetSocketAddress localAddress() {
115 return (InetSocketAddress) super.localAddress();
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123 @Override
124 public boolean isActive() {
125 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
126 }
127
128 @Override
129 public boolean isConnected() {
130 return connected;
131 }
132
133 @Override
134 public ChannelFuture joinGroup(InetAddress multicastAddress) {
135 return joinGroup(multicastAddress, newPromise());
136 }
137
138 @Override
139 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140 try {
141 return joinGroup(
142 multicastAddress,
143 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
144 } catch (IOException e) {
145 promise.setFailure(e);
146 }
147 return promise;
148 }
149
150 @Override
151 public ChannelFuture joinGroup(
152 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
153 return joinGroup(multicastAddress, networkInterface, newPromise());
154 }
155
156 @Override
157 public ChannelFuture joinGroup(
158 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
159 ChannelPromise promise) {
160 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
161 }
162
163 @Override
164 public ChannelFuture joinGroup(
165 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
166 return joinGroup(multicastAddress, networkInterface, source, newPromise());
167 }
168
169 @Override
170 public ChannelFuture joinGroup(
171 final InetAddress multicastAddress, final NetworkInterface networkInterface,
172 final InetAddress source, final ChannelPromise promise) {
173
174 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
175 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
176
177 try {
178 socket.joinGroup(multicastAddress, networkInterface, source);
179 promise.setSuccess();
180 } catch (IOException e) {
181 promise.setFailure(e);
182 }
183 return promise;
184 }
185
186 @Override
187 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
188 return leaveGroup(multicastAddress, newPromise());
189 }
190
191 @Override
192 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
193 try {
194 return leaveGroup(
195 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
196 } catch (IOException e) {
197 promise.setFailure(e);
198 }
199 return promise;
200 }
201
202 @Override
203 public ChannelFuture leaveGroup(
204 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
205 return leaveGroup(multicastAddress, networkInterface, newPromise());
206 }
207
208 @Override
209 public ChannelFuture leaveGroup(
210 InetSocketAddress multicastAddress,
211 NetworkInterface networkInterface, ChannelPromise promise) {
212 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
213 }
214
215 @Override
216 public ChannelFuture leaveGroup(
217 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
218 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
219 }
220
221 @Override
222 public ChannelFuture leaveGroup(
223 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
224 final ChannelPromise promise) {
225 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
226 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
227
228 try {
229 socket.leaveGroup(multicastAddress, networkInterface, source);
230 promise.setSuccess();
231 } catch (IOException e) {
232 promise.setFailure(e);
233 }
234 return promise;
235 }
236
237 @Override
238 public ChannelFuture block(
239 InetAddress multicastAddress, NetworkInterface networkInterface,
240 InetAddress sourceToBlock) {
241 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
242 }
243
244 @Override
245 public ChannelFuture block(
246 final InetAddress multicastAddress, final NetworkInterface networkInterface,
247 final InetAddress sourceToBlock, final ChannelPromise promise) {
248 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
249 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
250 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
251
252 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
253 return promise;
254 }
255
256 @Override
257 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
258 return block(multicastAddress, sourceToBlock, newPromise());
259 }
260
261 @Override
262 public ChannelFuture block(
263 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
264 try {
265 return block(
266 multicastAddress,
267 NetworkInterface.getByInetAddress(localAddress().getAddress()),
268 sourceToBlock, promise);
269 } catch (Throwable e) {
270 promise.setFailure(e);
271 }
272 return promise;
273 }
274
275 @Override
276 protected AbstractUnsafe newUnsafe() {
277 return new IoUringDatagramChannelUnsafe();
278 }
279
280 @Override
281 protected void doBind(SocketAddress localAddress) throws Exception {
282 if (localAddress instanceof InetSocketAddress) {
283 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
284 if (socketAddress.getAddress().isAnyLocalAddress() &&
285 socketAddress.getAddress() instanceof Inet4Address) {
286 if (socket.family() == SocketProtocolFamily.INET6) {
287 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
288 }
289 }
290 }
291 super.doBind(localAddress);
292 active = true;
293 }
294
295 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
296 if (envelope.recipient() instanceof InetSocketAddress
297 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
298 throw new UnresolvedAddressException();
299 }
300 }
301
302 @Override
303 protected Object filterOutboundMessage(Object msg) {
304 if (msg instanceof DatagramPacket) {
305 DatagramPacket packet = (DatagramPacket) msg;
306 checkUnresolved(packet);
307 ByteBuf content = packet.content();
308 return !content.hasMemoryAddress() ?
309 packet.replace(newDirectBuffer(packet, content)) : msg;
310 }
311
312 if (msg instanceof ByteBuf) {
313 ByteBuf buf = (ByteBuf) msg;
314 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
315 }
316
317 if (msg instanceof AddressedEnvelope) {
318 @SuppressWarnings("unchecked")
319 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
320 checkUnresolved(e);
321 if (e.content() instanceof ByteBuf &&
322 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
323
324 ByteBuf content = (ByteBuf) e.content();
325 return !content.hasMemoryAddress()?
326 new DefaultAddressedEnvelope<>(
327 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
328 }
329 }
330
331 throw new UnsupportedOperationException(
332 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
333 }
334
335 @Override
336 public DatagramChannelConfig config() {
337 return config;
338 }
339
340 @Override
341 protected void doDisconnect() throws Exception {
342
343 socket.disconnect();
344 connected = active = false;
345
346 resetCachedAddresses();
347 }
348
349 @Override
350 protected void doClose() throws Exception {
351 super.doClose();
352 connected = false;
353 }
354
355 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
356 private final WriteProcessor writeProcessor = new WriteProcessor();
357
358 private ByteBuf readBuffer;
359
360 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
361 private int written;
362 @Override
363 public boolean processMessage(Object msg) {
364 if (scheduleWrite(msg, written == 0)) {
365 written++;
366 return true;
367 }
368 return false;
369 }
370
371 int write(ChannelOutboundBuffer in) {
372 written = 0;
373 try {
374 in.forEachFlushedMessage(this);
375 } catch (Exception e) {
376
377 throw new IllegalStateException(e);
378 }
379 return written;
380 }
381 }
382
383 @Override
384 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
385 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
386 final ChannelPipeline pipeline = pipeline();
387 ByteBuf byteBuf = this.readBuffer;
388 assert byteBuf != null;
389 try {
390 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
391 } catch (Throwable t) {
392 if (connected && t instanceof NativeIoException) {
393 t = translateForConnected((NativeIoException) t);
394 }
395 pipeline.fireExceptionCaught(t);
396 }
397 }
398
399 private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
400 ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
401 throws IOException {
402 MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
403 if (res < 0) {
404 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
405
406
407 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
408 }
409 } else {
410 allocHandle.lastBytesRead(res);
411 if (hdr.hasPort(IoUringDatagramChannel.this)) {
412 allocHandle.incMessagesRead(1);
413 DatagramPacket packet = hdr.read(
414 IoUringDatagramChannel.this, (IoUringIoHandler) registration().ioHandler(), byteBuf, res);
415 pipeline.fireChannelRead(packet);
416 }
417 }
418
419
420 recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
421 if (outstanding == 0) {
422
423
424 this.readBuffer.release();
425 this.readBuffer = null;
426 recvmsgHdrs.clear();
427
428 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
429 if (allocHandle.lastBytesRead() > 0 &&
430 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
431
432
433
434
435 (!IoUring.isIOUringCqeFSockNonEmptySupported() ||
436 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
437
438 scheduleRead(false);
439 } else {
440
441 allocHandle.readComplete();
442 pipeline.fireChannelReadComplete();
443 }
444 }
445 }
446 }
447
448 @Override
449 protected int scheduleRead0(boolean first) {
450 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
451 ByteBuf byteBuf = allocHandle.allocate(alloc());
452 assert readBuffer == null;
453 readBuffer = byteBuf;
454
455 int writable = byteBuf.writableBytes();
456 allocHandle.attemptedBytesRead(writable);
457 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
458
459 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
460
461 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
462 if (scheduled == 0) {
463
464
465 readBuffer = null;
466 byteBuf.release();
467 }
468 return scheduled;
469 }
470
471 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
472 int writable = byteBuf.writableBytes();
473 long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
474 if (numDatagram <= 1) {
475 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
476 }
477 int i = 0;
478
479 for (; i < numDatagram && writable >= datagramSize; i++) {
480 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
481 break;
482 }
483 bufferAddress += datagramSize;
484 writable -= datagramSize;
485 }
486 return i;
487 }
488
489 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
490 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
491 if (msgHdrMemory == null) {
492
493 return false;
494 }
495 msgHdrMemory.write(socket, null, bufferAddress, bufferLength, (short) 0);
496
497 int fd = fd().intValue();
498 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
499 IoUringIoRegistration registration = registration();
500
501
502 IoUringIoOps ops = IoUringIoOps.newRecvmsg(
503 fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
504 long id = registration.submit(ops);
505 if (id == 0) {
506 return false;
507 }
508 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
509 return true;
510 }
511
512 @Override
513 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
514 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
515
516
517 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
518 sendmsgResArray[data] = res;
519
520 if (outstanding == 0) {
521
522 boolean writtenSomething = false;
523 int numWritten = sendmsgHdrs.length();
524 sendmsgHdrs.clear();
525 for (int i = 0; i < numWritten; i++) {
526 writtenSomething |= removeFromOutboundBuffer(
527 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
528 }
529 return writtenSomething;
530 }
531 return true;
532 }
533
534 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
535 if (res >= 0) {
536
537 return outboundBuffer.remove();
538 }
539 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
540 return false;
541 }
542 try {
543 return ioResult(errormsg, res) != 0;
544 } catch (Throwable cause) {
545 return outboundBuffer.remove(cause);
546 }
547 }
548
549 @Override
550 void connectComplete(byte op, int res, int flags, short data) {
551 if (res >= 0) {
552 connected = true;
553 }
554 super.connectComplete(op, res, flags, data);
555 }
556
557 @Override
558 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
559 return writeProcessor.write(in);
560 }
561
562 @Override
563 protected int scheduleWriteSingle(Object msg) {
564 return scheduleWrite(msg, true) ? 1 : 0;
565 }
566
567 private boolean scheduleWrite(Object msg, boolean first) {
568 final ByteBuf data;
569 final InetSocketAddress remoteAddress;
570 final int segmentSize;
571 if (msg instanceof AddressedEnvelope) {
572 @SuppressWarnings("unchecked")
573 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
574 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
575 data = envelope.content();
576 remoteAddress = envelope.recipient();
577 if (msg instanceof SegmentedDatagramPacket) {
578 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
579 } else {
580 segmentSize = 0;
581 }
582 } else {
583 data = (ByteBuf) msg;
584 remoteAddress = (InetSocketAddress) remoteAddress();
585 segmentSize = 0;
586 }
587
588 long bufferAddress = data.memoryAddress();
589 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
590 }
591
592 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
593 int bufferLength, int segmentSize, boolean first) {
594 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
595 if (hdr == null) {
596
597
598 return false;
599 }
600 hdr.write(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
601
602 int fd = fd().intValue();
603 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
604 IoUringIoRegistration registration = registration();
605 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
606 long id = registration.submit(ops);
607 if (id == 0) {
608 return false;
609 }
610 sendmsgHdrs.setId(hdr.idx(), id);
611 return true;
612 }
613
614 @Override
615 protected void freeResourcesNow(IoUringIoRegistration reg) {
616 sendmsgHdrs.release();
617 recvmsgHdrs.release();
618 super.freeResourcesNow(reg);
619 }
620 }
621
622 private static IOException translateForConnected(NativeIoException e) {
623
624 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
625 PortUnreachableException error = new PortUnreachableException(e.getMessage());
626 error.initCause(e);
627 return error;
628 }
629 return e;
630 }
631
632
633
634
635
636
637 public static boolean isSegmentedDatagramPacketSupported() {
638 return IoUring.isAvailable();
639 }
640
641 @Override
642 protected void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
643 if (numOutstandingReads > 0) {
644 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
645 assert canceled == numOutstandingReads;
646 }
647 }
648
649 @Override
650 protected void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
651 if (numOutstandingWrites > 0) {
652 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
653 assert canceled == numOutstandingWrites;
654 }
655 }
656
657 private int cancel(IoUringIoRegistration registration, byte op, MsgHdrMemoryArray array) {
658 int cancelled = 0;
659 int fd = fd().intValue();
660 for (int idx = 0; idx < array.length(); idx++) {
661 long id = array.id(idx);
662 if (id == MsgHdrMemoryArray.NO_ID) {
663 continue;
664 }
665
666
667 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(fd, (byte) 0, id, op);
668 registration.submit(ops);
669 cancelled++;
670 }
671 return cancelled;
672 }
673 }