View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultAddressedEnvelope;
26  import io.netty.channel.IoRegistration;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.socket.SocketProtocolFamily;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.Errors.NativeIoException;
33  import io.netty.channel.unix.SegmentedDatagramPacket;
34  import io.netty.channel.unix.Socket;
35  import io.netty.util.UncheckedBooleanSupplier;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.StringUtil;
38  
39  import java.io.IOException;
40  import java.net.Inet4Address;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.NetworkInterface;
44  import java.net.PortUnreachableException;
45  import java.net.SocketAddress;
46  import java.nio.channels.UnresolvedAddressException;
47  
48  import static io.netty.channel.unix.Errors.ioResult;
49  
50  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
51      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
52      private static final String EXPECTED_TYPES =
53              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
54              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
55              StringUtil.simpleClassName(ByteBuf.class) + ", " +
56              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
57              StringUtil.simpleClassName(ByteBuf.class) + ')';
58  
59      private final IoUringDatagramChannelConfig config;
60      private volatile boolean connected;
61  
62      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
63      //
64      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
65      //       some sort of other pool. Let's keep it simple for now.
66      //
67      // Consider exposing some configuration for that.
68      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
69      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
70      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
71  
72      /**
73       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
74       * on the Operation Systems default which will be chosen.
75       */
76      public IoUringDatagramChannel() {
77          this(null);
78      }
79  
80      /**
81       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
82       * on the Operation Systems default which will be chosen.
83       */
84      public IoUringDatagramChannel(SocketProtocolFamily family) {
85          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
86      }
87  
88      private static boolean useIpv6(SocketProtocolFamily family) {
89          if (family == null) {
90              return Socket.isIPv6Preferred();
91          }
92          return family == SocketProtocolFamily.INET6;
93      }
94  
95      /**
96       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
97       * on the Operation Systems default which will be chosen.
98       */
99      public IoUringDatagramChannel(int fd) {
100         this(new LinuxSocket(fd), true);
101     }
102 
103     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
104         // Always use a blocking fd and so make use of fast-poll.
105         super(null, fd, active);
106         config = new IoUringDatagramChannelConfig(this);
107     }
108 
109     @Override
110     public InetSocketAddress remoteAddress() {
111         return (InetSocketAddress) super.remoteAddress();
112     }
113 
114     @Override
115     public InetSocketAddress localAddress() {
116         return (InetSocketAddress) super.localAddress();
117     }
118 
119     @Override
120     public ChannelMetadata metadata() {
121         return METADATA;
122     }
123 
124     @Override
125     public boolean isActive() {
126         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
127     }
128 
129     @Override
130     public boolean isConnected() {
131         return connected;
132     }
133 
134     @Override
135     public ChannelFuture joinGroup(InetAddress multicastAddress) {
136         return joinGroup(multicastAddress, newPromise());
137     }
138 
139     @Override
140     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
141         try {
142             return joinGroup(
143                     multicastAddress,
144                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
145         } catch (IOException e) {
146             promise.setFailure(e);
147         }
148         return promise;
149     }
150 
151     @Override
152     public ChannelFuture joinGroup(
153             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
154         return joinGroup(multicastAddress, networkInterface, newPromise());
155     }
156 
157     @Override
158     public ChannelFuture joinGroup(
159             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
160             ChannelPromise promise) {
161         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
162     }
163 
164     @Override
165     public ChannelFuture joinGroup(
166             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
167         return joinGroup(multicastAddress, networkInterface, source, newPromise());
168     }
169 
170     @Override
171     public ChannelFuture joinGroup(
172             final InetAddress multicastAddress, final NetworkInterface networkInterface,
173             final InetAddress source, final ChannelPromise promise) {
174 
175         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
176         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
177 
178         try {
179             socket.joinGroup(multicastAddress, networkInterface, source);
180             promise.setSuccess();
181         } catch (IOException e) {
182             promise.setFailure(e);
183         }
184         return promise;
185     }
186 
187     @Override
188     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
189         return leaveGroup(multicastAddress, newPromise());
190     }
191 
192     @Override
193     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
194         try {
195             return leaveGroup(
196                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
197         } catch (IOException e) {
198             promise.setFailure(e);
199         }
200         return promise;
201     }
202 
203     @Override
204     public ChannelFuture leaveGroup(
205             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
206         return leaveGroup(multicastAddress, networkInterface, newPromise());
207     }
208 
209     @Override
210     public ChannelFuture leaveGroup(
211             InetSocketAddress multicastAddress,
212             NetworkInterface networkInterface, ChannelPromise promise) {
213         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
214     }
215 
216     @Override
217     public ChannelFuture leaveGroup(
218             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
219         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
220     }
221 
222     @Override
223     public ChannelFuture leaveGroup(
224             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
225             final ChannelPromise promise) {
226         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
227         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
228 
229         try {
230             socket.leaveGroup(multicastAddress, networkInterface, source);
231             promise.setSuccess();
232         } catch (IOException e) {
233             promise.setFailure(e);
234         }
235         return promise;
236     }
237 
238     @Override
239     public ChannelFuture block(
240             InetAddress multicastAddress, NetworkInterface networkInterface,
241             InetAddress sourceToBlock) {
242         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
243     }
244 
245     @Override
246     public ChannelFuture block(
247             final InetAddress multicastAddress, final NetworkInterface networkInterface,
248             final InetAddress sourceToBlock, final ChannelPromise promise) {
249         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
250         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
251         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
252 
253         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
254         return promise;
255     }
256 
257     @Override
258     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
259         return block(multicastAddress, sourceToBlock, newPromise());
260     }
261 
262     @Override
263     public ChannelFuture block(
264             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
265         try {
266             return block(
267                     multicastAddress,
268                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
269                     sourceToBlock, promise);
270         } catch (Throwable e) {
271             promise.setFailure(e);
272         }
273         return promise;
274     }
275 
276     @Override
277     protected AbstractUnsafe newUnsafe() {
278         return new IoUringDatagramChannelUnsafe();
279     }
280 
281     @Override
282     protected void doBind(SocketAddress localAddress) throws Exception {
283         if (localAddress instanceof InetSocketAddress) {
284             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
285             if (socketAddress.getAddress().isAnyLocalAddress() &&
286                     socketAddress.getAddress() instanceof Inet4Address) {
287                 if (socket.family() == SocketProtocolFamily.INET6) {
288                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
289                 }
290             }
291         }
292         super.doBind(localAddress);
293         active = true;
294     }
295 
296     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
297         if (envelope.recipient() instanceof InetSocketAddress
298                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
299             throw new UnresolvedAddressException();
300         }
301     }
302 
303     @Override
304     protected Object filterOutboundMessage(Object msg) {
305         if (msg instanceof DatagramPacket) {
306             DatagramPacket packet = (DatagramPacket) msg;
307             checkUnresolved(packet);
308             ByteBuf content = packet.content();
309             return !content.hasMemoryAddress() ?
310                     packet.replace(newDirectBuffer(packet, content)) : msg;
311         }
312 
313         if (msg instanceof ByteBuf) {
314             ByteBuf buf = (ByteBuf) msg;
315             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
316         }
317 
318         if (msg instanceof AddressedEnvelope) {
319             @SuppressWarnings("unchecked")
320             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
321             checkUnresolved(e);
322             if (e.content() instanceof ByteBuf &&
323                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
324 
325                 ByteBuf content = (ByteBuf) e.content();
326                 return !content.hasMemoryAddress()?
327                         new DefaultAddressedEnvelope<>(
328                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
329             }
330         }
331 
332         throw new UnsupportedOperationException(
333                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
334     }
335 
336     @Override
337     public DatagramChannelConfig config() {
338         return config;
339     }
340 
341     @Override
342     protected void doDisconnect() throws Exception {
343         // TODO: use io_uring for this too...
344         socket.disconnect();
345         connected = active = false;
346 
347         resetCachedAddresses();
348     }
349 
350     @Override
351     protected void doClose() throws Exception {
352         super.doClose();
353         connected = false;
354     }
355 
356     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
357         private final WriteProcessor writeProcessor = new WriteProcessor();
358 
359         private ByteBuf readBuffer;
360 
361         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
362             private int written;
363             @Override
364             public boolean processMessage(Object msg) {
365                 if (scheduleWrite(msg, written == 0)) {
366                     written++;
367                     return true;
368                 }
369                 return false;
370             }
371 
372             int write(ChannelOutboundBuffer in) {
373                 written = 0;
374                 try {
375                     in.forEachFlushedMessage(this);
376                 } catch (Exception e) {
377                     // This should never happen as our processMessage(...) never throws.
378                     throw new IllegalStateException(e);
379                 }
380                 return written;
381             }
382         }
383 
384         @Override
385         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
386             assert outstanding != -1 : "multi-shot not implemented yet";
387 
388             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
389             final ChannelPipeline pipeline = pipeline();
390             ByteBuf byteBuf = this.readBuffer;
391             assert byteBuf != null;
392             try {
393                 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
394             } catch (Throwable t) {
395                 if (connected && t instanceof NativeIoException) {
396                     t = translateForConnected((NativeIoException) t);
397                 }
398                 pipeline.fireExceptionCaught(t);
399             }
400         }
401 
402         private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
403                                       ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
404                 throws IOException {
405             MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
406             if (res < 0) {
407                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
408                     // If res is negative we should pass it to ioResult(...) which will either throw
409                     // or convert it to 0 if we could not read because the socket was not readable.
410                     allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
411                 }
412             } else {
413                 allocHandle.lastBytesRead(res);
414                 if (hdr.hasPort(IoUringDatagramChannel.this)) {
415                     allocHandle.incMessagesRead(1);
416                     DatagramPacket packet = hdr.read(
417                             IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
418                     pipeline.fireChannelRead(packet);
419                 }
420             }
421 
422             // Reset the id as this read was completed and so don't need to be cancelled later.
423             recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
424             if (outstanding == 0) {
425                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
426                 // another one or if the user will do it.
427                 this.readBuffer.release();
428                 this.readBuffer = null;
429                 recvmsgHdrs.clear();
430 
431                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
432                     if (allocHandle.lastBytesRead() > 0 &&
433                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
434                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
435                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
436                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
437                             // be able to read any data. This is useless work and we can skip it.
438                             (!IoUring.isCqeFSockNonEmptySupported() ||
439                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
440                         // Let's schedule another read.
441                         scheduleRead(false);
442                     } else {
443                         // the read was completed with EAGAIN.
444                         allocHandle.readComplete();
445                         pipeline.fireChannelReadComplete();
446                     }
447                 }
448             }
449         }
450 
451         @Override
452         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
453             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
454             ByteBuf byteBuf = allocHandle.allocate(alloc());
455             assert readBuffer == null;
456             readBuffer = byteBuf;
457 
458             int writable = byteBuf.writableBytes();
459             allocHandle.attemptedBytesRead(writable);
460             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
461 
462             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
463 
464             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
465             if (scheduled == 0) {
466                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
467                 // completion event.
468                 readBuffer = null;
469                 byteBuf.release();
470             }
471             return scheduled;
472         }
473 
474         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
475             int writable = byteBuf.writableBytes();
476             long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
477             if (numDatagram <= 1) {
478                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
479             }
480             int i = 0;
481             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
482             for (; i < numDatagram && writable >= datagramSize; i++) {
483                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
484                     break;
485                 }
486                 bufferAddress += datagramSize;
487                 writable -= datagramSize;
488             }
489             return i;
490         }
491 
492         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
493             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
494             if (msgHdrMemory == null) {
495                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
496                 return false;
497             }
498             msgHdrMemory.write(socket, null, bufferAddress, bufferLength, (short) 0);
499 
500             int fd = fd().intValue();
501             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
502             IoRegistration registration = registration();
503             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
504             // readComplete0(...)
505             IoUringIoOps ops = IoUringIoOps.newRecvmsg(
506                     fd, flags((byte) 0), msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
507             long id = registration.submit(ops);
508             if (id == 0) {
509                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
510                 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
511                 return false;
512             }
513             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
514             return true;
515         }
516 
517         @Override
518         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
519             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
520 
521             // Reset the id as this write was completed and so don't need to be cancelled later.
522             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
523             sendmsgResArray[data] = res;
524             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
525             if (outstanding == 0) {
526                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
527                 boolean writtenSomething = false;
528                 int numWritten = sendmsgHdrs.length();
529                 sendmsgHdrs.clear();
530                 for (int i = 0; i < numWritten; i++) {
531                     writtenSomething |= removeFromOutboundBuffer(
532                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
533                 }
534                 return writtenSomething;
535             }
536             return true;
537         }
538 
539         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
540             if (res >= 0) {
541                 // When using Datagram we should consider the message written as long as res is not negative.
542                 return outboundBuffer.remove();
543             }
544             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
545                 return false;
546             }
547             try {
548                 return ioResult(errormsg, res) != 0;
549             } catch (Throwable cause) {
550                 return outboundBuffer.remove(cause);
551             }
552         }
553 
554         @Override
555         void connectComplete(byte op, int res, int flags, short data) {
556             if (res >= 0) {
557                 connected = true;
558             }
559             super.connectComplete(op, res, flags, data);
560         }
561 
562         @Override
563         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
564             return writeProcessor.write(in);
565         }
566 
567         @Override
568         protected int scheduleWriteSingle(Object msg) {
569             return scheduleWrite(msg, true) ? 1 : 0;
570         }
571 
572         private boolean scheduleWrite(Object msg, boolean first) {
573             final ByteBuf data;
574             final InetSocketAddress remoteAddress;
575             final int segmentSize;
576             if (msg instanceof AddressedEnvelope) {
577                 @SuppressWarnings("unchecked")
578                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
579                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
580                 data = envelope.content();
581                 remoteAddress = envelope.recipient();
582                 if (msg instanceof SegmentedDatagramPacket) {
583                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
584                 } else {
585                     segmentSize = 0;
586                 }
587             } else {
588                 data = (ByteBuf) msg;
589                 remoteAddress = (InetSocketAddress) remoteAddress();
590                 segmentSize = 0;
591             }
592 
593             long bufferAddress = data.memoryAddress();
594             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
595         }
596 
597         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
598                                         int bufferLength, int segmentSize, boolean first) {
599             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
600             if (hdr == null) {
601                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
602                 // before we can write again.
603                 return false;
604             }
605             hdr.write(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
606 
607             int fd = fd().intValue();
608             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
609             IoRegistration registration = registration();
610             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), msgFlags, hdr.address(), hdr.idx());
611             long id = registration.submit(ops);
612             if (id == 0) {
613                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
614                 sendmsgHdrs.restoreNextHdr(hdr);
615                 return false;
616             }
617             sendmsgHdrs.setId(hdr.idx(), id);
618             return true;
619         }
620 
621         @Override
622         protected void freeResourcesNow(IoRegistration reg) {
623             sendmsgHdrs.release();
624             recvmsgHdrs.release();
625             super.freeResourcesNow(reg);
626         }
627     }
628 
629     private static IOException translateForConnected(NativeIoException e) {
630         // We need to correctly translate connect errors to match NIO behaviour.
631         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
632             PortUnreachableException error = new PortUnreachableException(e.getMessage());
633             error.initCause(e);
634             return error;
635         }
636         return e;
637     }
638 
639     /**
640      * Returns {@code true} if the usage of {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported.
641      *
642      * @return {@code true} if supported, {@code false} otherwise.
643      */
644     public static boolean isSegmentedDatagramPacketSupported() {
645         return IoUring.isAvailable();
646     }
647 
648     @Override
649     protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
650         if (numOutstandingReads > 0) {
651             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
652             assert canceled == numOutstandingReads;
653         }
654     }
655 
656     @Override
657     protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
658         if (numOutstandingWrites > 0) {
659             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
660             assert canceled == numOutstandingWrites;
661         }
662     }
663 
664     private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
665         int cancelled = 0;
666         for (int idx = 0; idx < array.length(); idx++) {
667             long id = array.id(idx);
668             if (id == MsgHdrMemoryArray.NO_ID) {
669                 continue;
670             }
671             // Let's try to cancel outstanding op as these might be submitted and waiting for data
672             // (via fastpoll).
673             IoUringIoOps ops = IoUringIoOps.newAsyncCancel(flags((byte) 0), id, op);
674             registration.submit(ops);
675             cancelled++;
676         }
677         return cancelled;
678     }
679 
680     @Override
681     protected boolean socketIsEmpty(int flags) {
682         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
683     }
684 
685     @Override
686     boolean isPollInFirst() {
687         return false;
688     }
689 }