1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.socket.DatagramChannel;
27 import io.netty.channel.socket.DatagramChannelConfig;
28 import io.netty.channel.socket.DatagramPacket;
29 import io.netty.channel.socket.SocketProtocolFamily;
30 import io.netty.channel.unix.Errors;
31 import io.netty.channel.unix.Errors.NativeIoException;
32 import io.netty.channel.unix.SegmentedDatagramPacket;
33 import io.netty.channel.unix.Socket;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.ObjectUtil;
36 import io.netty.util.internal.StringUtil;
37
38 import java.io.IOException;
39 import java.net.Inet4Address;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.NetworkInterface;
43 import java.net.PortUnreachableException;
44 import java.net.SocketAddress;
45 import java.nio.channels.UnresolvedAddressException;
46
47 import static io.netty.channel.unix.Errors.ioResult;
48
49 public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
50 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
51 private static final String EXPECTED_TYPES =
52 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54 StringUtil.simpleClassName(ByteBuf.class) + ", " +
55 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56 StringUtil.simpleClassName(ByteBuf.class) + ')';
57
58 private final IoUringDatagramChannelConfig config;
59 private volatile boolean connected;
60
61
62
63
64
65
66
67 private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
68 private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
69 private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
70
71
72
73
74
75 public IoUringDatagramChannel() {
76 this(null);
77 }
78
79
80
81
82
83 public IoUringDatagramChannel(SocketProtocolFamily family) {
84 this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
85 }
86
87 private static boolean useIpv6(SocketProtocolFamily family) {
88 if (family == null) {
89 return Socket.isIPv6Preferred();
90 }
91 return family == SocketProtocolFamily.INET6;
92 }
93
94
95
96
97
98 public IoUringDatagramChannel(int fd) {
99 this(new LinuxSocket(fd), true);
100 }
101
102 private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
103
104 super(null, LinuxSocket.makeBlocking(fd), active);
105 config = new IoUringDatagramChannelConfig(this);
106 }
107
108 @Override
109 public InetSocketAddress remoteAddress() {
110 return (InetSocketAddress) super.remoteAddress();
111 }
112
113 @Override
114 public InetSocketAddress localAddress() {
115 return (InetSocketAddress) super.localAddress();
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123 @Override
124 public boolean isActive() {
125 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
126 }
127
128 @Override
129 public boolean isConnected() {
130 return connected;
131 }
132
133 @Override
134 public ChannelFuture joinGroup(InetAddress multicastAddress) {
135 return joinGroup(multicastAddress, newPromise());
136 }
137
138 @Override
139 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140 try {
141 return joinGroup(
142 multicastAddress,
143 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
144 } catch (IOException e) {
145 promise.setFailure(e);
146 }
147 return promise;
148 }
149
150 @Override
151 public ChannelFuture joinGroup(
152 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
153 return joinGroup(multicastAddress, networkInterface, newPromise());
154 }
155
156 @Override
157 public ChannelFuture joinGroup(
158 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
159 ChannelPromise promise) {
160 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
161 }
162
163 @Override
164 public ChannelFuture joinGroup(
165 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
166 return joinGroup(multicastAddress, networkInterface, source, newPromise());
167 }
168
169 @Override
170 public ChannelFuture joinGroup(
171 final InetAddress multicastAddress, final NetworkInterface networkInterface,
172 final InetAddress source, final ChannelPromise promise) {
173
174 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
175 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
176
177 try {
178 socket.joinGroup(multicastAddress, networkInterface, source);
179 promise.setSuccess();
180 } catch (IOException e) {
181 promise.setFailure(e);
182 }
183 return promise;
184 }
185
186 @Override
187 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
188 return leaveGroup(multicastAddress, newPromise());
189 }
190
191 @Override
192 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
193 try {
194 return leaveGroup(
195 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
196 } catch (IOException e) {
197 promise.setFailure(e);
198 }
199 return promise;
200 }
201
202 @Override
203 public ChannelFuture leaveGroup(
204 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
205 return leaveGroup(multicastAddress, networkInterface, newPromise());
206 }
207
208 @Override
209 public ChannelFuture leaveGroup(
210 InetSocketAddress multicastAddress,
211 NetworkInterface networkInterface, ChannelPromise promise) {
212 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
213 }
214
215 @Override
216 public ChannelFuture leaveGroup(
217 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
218 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
219 }
220
221 @Override
222 public ChannelFuture leaveGroup(
223 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
224 final ChannelPromise promise) {
225 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
226 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
227
228 try {
229 socket.leaveGroup(multicastAddress, networkInterface, source);
230 promise.setSuccess();
231 } catch (IOException e) {
232 promise.setFailure(e);
233 }
234 return promise;
235 }
236
237 @Override
238 public ChannelFuture block(
239 InetAddress multicastAddress, NetworkInterface networkInterface,
240 InetAddress sourceToBlock) {
241 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
242 }
243
244 @Override
245 public ChannelFuture block(
246 final InetAddress multicastAddress, final NetworkInterface networkInterface,
247 final InetAddress sourceToBlock, final ChannelPromise promise) {
248 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
249 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
250 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
251
252 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
253 return promise;
254 }
255
256 @Override
257 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
258 return block(multicastAddress, sourceToBlock, newPromise());
259 }
260
261 @Override
262 public ChannelFuture block(
263 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
264 try {
265 return block(
266 multicastAddress,
267 NetworkInterface.getByInetAddress(localAddress().getAddress()),
268 sourceToBlock, promise);
269 } catch (Throwable e) {
270 promise.setFailure(e);
271 }
272 return promise;
273 }
274
275 @Override
276 protected AbstractUnsafe newUnsafe() {
277 return new IoUringDatagramChannelUnsafe();
278 }
279
280 @Override
281 protected void doBind(SocketAddress localAddress) throws Exception {
282 if (localAddress instanceof InetSocketAddress) {
283 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
284 if (socketAddress.getAddress().isAnyLocalAddress() &&
285 socketAddress.getAddress() instanceof Inet4Address) {
286 if (socket.family() == SocketProtocolFamily.INET6) {
287 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
288 }
289 }
290 }
291 super.doBind(localAddress);
292 active = true;
293 }
294
295 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
296 if (envelope.recipient() instanceof InetSocketAddress
297 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
298 throw new UnresolvedAddressException();
299 }
300 }
301
302 @Override
303 protected Object filterOutboundMessage(Object msg) {
304 if (msg instanceof DatagramPacket) {
305 DatagramPacket packet = (DatagramPacket) msg;
306 checkUnresolved(packet);
307 ByteBuf content = packet.content();
308 return !content.hasMemoryAddress() ?
309 packet.replace(newDirectBuffer(packet, content)) : msg;
310 }
311
312 if (msg instanceof ByteBuf) {
313 ByteBuf buf = (ByteBuf) msg;
314 return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
315 }
316
317 if (msg instanceof AddressedEnvelope) {
318 @SuppressWarnings("unchecked")
319 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
320 checkUnresolved(e);
321 if (e.content() instanceof ByteBuf &&
322 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
323
324 ByteBuf content = (ByteBuf) e.content();
325 return !content.hasMemoryAddress()?
326 new DefaultAddressedEnvelope<>(
327 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
328 }
329 }
330
331 throw new UnsupportedOperationException(
332 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
333 }
334
335 @Override
336 public DatagramChannelConfig config() {
337 return config;
338 }
339
340 @Override
341 protected void doDisconnect() throws Exception {
342
343 socket.disconnect();
344 connected = active = false;
345
346 resetCachedAddresses();
347 }
348
349 @Override
350 protected void doClose() throws Exception {
351 super.doClose();
352 connected = false;
353 }
354
355 private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
356 private final WriteProcessor writeProcessor = new WriteProcessor();
357
358 private ByteBuf readBuffer;
359
360 private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
361 private int written;
362 @Override
363 public boolean processMessage(Object msg) {
364 if (scheduleWrite(msg, written == 0)) {
365 written++;
366 return true;
367 }
368 return false;
369 }
370
371 int write(ChannelOutboundBuffer in) {
372 written = 0;
373 try {
374 in.forEachFlushedMessage(this);
375 } catch (Exception e) {
376
377 throw new IllegalStateException(e);
378 }
379 return written;
380 }
381 }
382
383 @Override
384 protected void readComplete0(int res, int flags, int data, int outstanding) {
385 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
386 final ChannelPipeline pipeline = pipeline();
387 ByteBuf byteBuf = this.readBuffer;
388 assert byteBuf != null;
389 try {
390 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
391 } catch (Throwable t) {
392 if (connected && t instanceof NativeIoException) {
393 t = translateForConnected((NativeIoException) t);
394 }
395 pipeline.fireExceptionCaught(t);
396 }
397 }
398
399 private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
400 ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
401 throws IOException {
402 MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
403 if (res < 0) {
404 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
405
406
407 allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
408 }
409 } else {
410 allocHandle.lastBytesRead(res);
411 if (hdr.hasPort(IoUringDatagramChannel.this)) {
412 allocHandle.incMessagesRead(1);
413 DatagramPacket packet = hdr.read(
414 IoUringDatagramChannel.this, (IoUringIoHandler) registration().ioHandler(), byteBuf, res);
415 pipeline.fireChannelRead(packet);
416 }
417 }
418
419
420 recvmsgHdrs.setId(idx, -1);
421 if (outstanding == 0) {
422
423
424 this.readBuffer.release();
425 this.readBuffer = null;
426 recvmsgHdrs.clear();
427
428 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
429 if (allocHandle.lastBytesRead() > 0 &&
430 allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
431
432
433
434
435 (!IoUring.isIOUringCqeFSockNonEmptySupported() ||
436 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
437
438 scheduleRead(false);
439 } else {
440
441 allocHandle.readComplete();
442 pipeline.fireChannelReadComplete();
443 }
444 }
445 }
446 }
447
448 @Override
449 protected int scheduleRead0(boolean first) {
450 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
451 ByteBuf byteBuf = allocHandle.allocate(alloc());
452 assert readBuffer == null;
453 readBuffer = byteBuf;
454
455 int writable = byteBuf.writableBytes();
456 allocHandle.attemptedBytesRead(writable);
457 int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
458
459 int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
460
461 int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
462 if (scheduled == 0) {
463
464
465 readBuffer = null;
466 byteBuf.release();
467 }
468 return scheduled;
469 }
470
471 private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
472 int writable = byteBuf.writableBytes();
473 long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
474 if (numDatagram <= 1) {
475 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
476 }
477 int i = 0;
478
479 for (; i < numDatagram && writable >= datagramSize; i++) {
480 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
481 break;
482 }
483 bufferAddress += datagramSize;
484 writable -= datagramSize;
485 }
486 return i;
487 }
488
489 private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
490 MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
491 if (msgHdrMemory == null) {
492
493 return false;
494 }
495 msgHdrMemory.write(socket, null, bufferAddress, bufferLength, (short) 0);
496
497 int fd = fd().intValue();
498 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
499 IoUringIoRegistration registration = registration();
500
501
502 IoUringIoOps ops = IoUringIoOps.newRecvmsg(fd, 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
503 long id = registration.submit(ops);
504 recvmsgHdrs.setId(msgHdrMemory.idx(), id);
505 return true;
506 }
507
508 @Override
509 boolean writeComplete0(int res, int flags, int data, int outstanding) {
510 ChannelOutboundBuffer outboundBuffer = outboundBuffer();
511
512
513 sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
514 sendmsgResArray[data] = res;
515
516 if (outstanding == 0) {
517
518 boolean writtenSomething = false;
519 int numWritten = sendmsgHdrs.length();
520 sendmsgHdrs.clear();
521 for (int i = 0; i < numWritten; i++) {
522 writtenSomething |= removeFromOutboundBuffer(
523 outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
524 }
525 return writtenSomething;
526 }
527 return true;
528 }
529
530 private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
531 if (res >= 0) {
532
533 return outboundBuffer.remove();
534 }
535 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
536 return false;
537 }
538 try {
539 return ioResult(errormsg, res) != 0;
540 } catch (Throwable cause) {
541 return outboundBuffer.remove(cause);
542 }
543 }
544
545 @Override
546 void connectComplete(int res, int flags, short data) {
547 if (res >= 0) {
548 connected = true;
549 }
550 super.connectComplete(res, flags, data);
551 }
552
553 @Override
554 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
555 return writeProcessor.write(in);
556 }
557
558 @Override
559 protected int scheduleWriteSingle(Object msg) {
560 return scheduleWrite(msg, true) ? 1 : 0;
561 }
562
563 private boolean scheduleWrite(Object msg, boolean first) {
564 final ByteBuf data;
565 final InetSocketAddress remoteAddress;
566 final int segmentSize;
567 if (msg instanceof AddressedEnvelope) {
568 @SuppressWarnings("unchecked")
569 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
570 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
571 data = envelope.content();
572 remoteAddress = envelope.recipient();
573 if (msg instanceof SegmentedDatagramPacket) {
574 segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
575 } else {
576 segmentSize = 0;
577 }
578 } else {
579 data = (ByteBuf) msg;
580 remoteAddress = (InetSocketAddress) remoteAddress();
581 segmentSize = 0;
582 }
583
584 long bufferAddress = data.memoryAddress();
585 return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
586 }
587
588 private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
589 int bufferLength, int segmentSize, boolean first) {
590 MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
591 if (hdr == null) {
592
593
594 return false;
595 }
596 hdr.write(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
597
598 int fd = fd().intValue();
599 int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
600 IoUringIoRegistration registration = registration();
601 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, 0, msgFlags, hdr.address(), hdr.idx());
602 long id = registration.submit(ops);
603 sendmsgHdrs.setId(hdr.idx(), id);
604 return true;
605 }
606
607 @Override
608 protected void freeResourcesNow(IoUringIoRegistration reg) {
609 sendmsgHdrs.release();
610 recvmsgHdrs.release();
611 super.freeResourcesNow(reg);
612 }
613 }
614
615 private static IOException translateForConnected(NativeIoException e) {
616
617 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
618 PortUnreachableException error = new PortUnreachableException(e.getMessage());
619 error.initCause(e);
620 return error;
621 }
622 return e;
623 }
624
625
626
627
628
629
630 public static boolean isSegmentedDatagramPacketSupported() {
631 return IoUring.isAvailable();
632 }
633
634 @Override
635 protected void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
636 if (numOutstandingReads > 0) {
637 int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
638 assert canceled == numOutstandingReads;
639 }
640 }
641
642 @Override
643 protected void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
644 if (numOutstandingWrites > 0) {
645 int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
646 assert canceled == numOutstandingWrites;
647 }
648 }
649
650 private int cancel(IoUringIoRegistration registration, byte op, MsgHdrMemoryArray array) {
651 int cancelled = 0;
652 int fd = fd().intValue();
653 for (int idx = 0; idx < array.length(); idx++) {
654 long id = array.id(idx);
655 if (id == MsgHdrMemoryArray.NO_ID) {
656 continue;
657 }
658
659
660 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(fd, 0, id, op);
661 registration.submit(ops);
662 cancelled++;
663 }
664 return cancelled;
665 }
666 }