View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultAddressedEnvelope;
26  import io.netty.channel.IoRegistration;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.socket.SocketProtocolFamily;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.Errors.NativeIoException;
33  import io.netty.channel.unix.SegmentedDatagramPacket;
34  import io.netty.channel.unix.Socket;
35  import io.netty.util.UncheckedBooleanSupplier;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.StringUtil;
38  
39  import java.io.IOException;
40  import java.net.Inet4Address;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.NetworkInterface;
44  import java.net.PortUnreachableException;
45  import java.net.SocketAddress;
46  import java.nio.channels.UnresolvedAddressException;
47  
48  import static io.netty.channel.unix.Errors.ioResult;
49  
50  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
51      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
52      private static final String EXPECTED_TYPES =
53              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
54              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
55              StringUtil.simpleClassName(ByteBuf.class) + ", " +
56              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
57              StringUtil.simpleClassName(ByteBuf.class) + ')';
58  
59      private final IoUringDatagramChannelConfig config;
60      private volatile boolean connected;
61  
62      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
63      //
64      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
65      //       some sort of other pool. Let's keep it simple for now.
66      //
67      // Consider exposing some configuration for that.
68      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
69      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
70      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
71  
72      /**
73       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
74       * on the Operation Systems default which will be chosen.
75       */
76      public IoUringDatagramChannel() {
77          this(null);
78      }
79  
80      /**
81       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
82       * on the Operation Systems default which will be chosen.
83       */
84      public IoUringDatagramChannel(SocketProtocolFamily family) {
85          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
86      }
87  
88      private static boolean useIpv6(SocketProtocolFamily family) {
89          if (family == null) {
90              return Socket.isIPv6Preferred();
91          }
92          return family == SocketProtocolFamily.INET6;
93      }
94  
95      /**
96       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
97       * on the Operation Systems default which will be chosen.
98       */
99      public IoUringDatagramChannel(int fd) {
100         this(new LinuxSocket(fd), true);
101     }
102 
103     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
104         // Always use a blocking fd and so make use of fast-poll.
105         super(null, fd, active);
106         config = new IoUringDatagramChannelConfig(this);
107     }
108 
109     @Override
110     public InetSocketAddress remoteAddress() {
111         return (InetSocketAddress) super.remoteAddress();
112     }
113 
114     @Override
115     public InetSocketAddress localAddress() {
116         return (InetSocketAddress) super.localAddress();
117     }
118 
119     @Override
120     public ChannelMetadata metadata() {
121         return METADATA;
122     }
123 
124     @Override
125     public boolean isActive() {
126         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
127     }
128 
129     @Override
130     public boolean isConnected() {
131         return connected;
132     }
133 
134     @Override
135     public ChannelFuture joinGroup(InetAddress multicastAddress) {
136         return joinGroup(multicastAddress, newPromise());
137     }
138 
139     @Override
140     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
141         try {
142             return joinGroup(
143                     multicastAddress,
144                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
145         } catch (IOException e) {
146             promise.setFailure(e);
147         }
148         return promise;
149     }
150 
151     @Override
152     public ChannelFuture joinGroup(
153             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
154         return joinGroup(multicastAddress, networkInterface, newPromise());
155     }
156 
157     @Override
158     public ChannelFuture joinGroup(
159             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
160             ChannelPromise promise) {
161         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
162     }
163 
164     @Override
165     public ChannelFuture joinGroup(
166             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
167         return joinGroup(multicastAddress, networkInterface, source, newPromise());
168     }
169 
170     @Override
171     public ChannelFuture joinGroup(
172             final InetAddress multicastAddress, final NetworkInterface networkInterface,
173             final InetAddress source, final ChannelPromise promise) {
174 
175         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
176         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
177 
178         try {
179             socket.joinGroup(multicastAddress, networkInterface, source);
180             promise.setSuccess();
181         } catch (IOException e) {
182             promise.setFailure(e);
183         }
184         return promise;
185     }
186 
187     @Override
188     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
189         return leaveGroup(multicastAddress, newPromise());
190     }
191 
192     @Override
193     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
194         try {
195             return leaveGroup(
196                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
197         } catch (IOException e) {
198             promise.setFailure(e);
199         }
200         return promise;
201     }
202 
203     @Override
204     public ChannelFuture leaveGroup(
205             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
206         return leaveGroup(multicastAddress, networkInterface, newPromise());
207     }
208 
209     @Override
210     public ChannelFuture leaveGroup(
211             InetSocketAddress multicastAddress,
212             NetworkInterface networkInterface, ChannelPromise promise) {
213         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
214     }
215 
216     @Override
217     public ChannelFuture leaveGroup(
218             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
219         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
220     }
221 
222     @Override
223     public ChannelFuture leaveGroup(
224             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
225             final ChannelPromise promise) {
226         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
227         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
228 
229         try {
230             socket.leaveGroup(multicastAddress, networkInterface, source);
231             promise.setSuccess();
232         } catch (IOException e) {
233             promise.setFailure(e);
234         }
235         return promise;
236     }
237 
238     @Override
239     public ChannelFuture block(
240             InetAddress multicastAddress, NetworkInterface networkInterface,
241             InetAddress sourceToBlock) {
242         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
243     }
244 
245     @Override
246     public ChannelFuture block(
247             final InetAddress multicastAddress, final NetworkInterface networkInterface,
248             final InetAddress sourceToBlock, final ChannelPromise promise) {
249         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
250         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
251         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
252 
253         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
254         return promise;
255     }
256 
257     @Override
258     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
259         return block(multicastAddress, sourceToBlock, newPromise());
260     }
261 
262     @Override
263     public ChannelFuture block(
264             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
265         try {
266             return block(
267                     multicastAddress,
268                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
269                     sourceToBlock, promise);
270         } catch (Throwable e) {
271             promise.setFailure(e);
272         }
273         return promise;
274     }
275 
276     @Override
277     protected AbstractUnsafe newUnsafe() {
278         return new IoUringDatagramChannelUnsafe();
279     }
280 
281     @Override
282     protected void doBind(SocketAddress localAddress) throws Exception {
283         if (localAddress instanceof InetSocketAddress) {
284             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
285             if (socketAddress.getAddress().isAnyLocalAddress() &&
286                     socketAddress.getAddress() instanceof Inet4Address) {
287                 if (socket.family() == SocketProtocolFamily.INET6) {
288                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
289                 }
290             }
291         }
292         super.doBind(localAddress);
293         active = true;
294     }
295 
296     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
297         if (envelope.recipient() instanceof InetSocketAddress
298                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
299             throw new UnresolvedAddressException();
300         }
301     }
302 
303     @Override
304     protected Object filterOutboundMessage(Object msg) {
305         if (msg instanceof DatagramPacket) {
306             DatagramPacket packet = (DatagramPacket) msg;
307             checkUnresolved(packet);
308             ByteBuf content = packet.content();
309             return !content.hasMemoryAddress() ?
310                     packet.replace(newDirectBuffer(packet, content)) : msg;
311         }
312 
313         if (msg instanceof ByteBuf) {
314             ByteBuf buf = (ByteBuf) msg;
315             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
316         }
317 
318         if (msg instanceof AddressedEnvelope) {
319             @SuppressWarnings("unchecked")
320             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
321             checkUnresolved(e);
322             if (e.content() instanceof ByteBuf &&
323                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
324 
325                 ByteBuf content = (ByteBuf) e.content();
326                 return !content.hasMemoryAddress()?
327                         new DefaultAddressedEnvelope<>(
328                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
329             }
330         }
331 
332         throw new UnsupportedOperationException(
333                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
334     }
335 
336     @Override
337     public DatagramChannelConfig config() {
338         return config;
339     }
340 
341     @Override
342     protected void doDisconnect() throws Exception {
343         // TODO: use io_uring for this too...
344         socket.disconnect();
345         connected = active = false;
346 
347         resetCachedAddresses();
348     }
349 
350     @Override
351     protected void doClose() throws Exception {
352         super.doClose();
353         connected = false;
354     }
355 
356     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
357         private final WriteProcessor writeProcessor = new WriteProcessor();
358 
359         private ByteBuf readBuffer;
360 
361         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
362             private int written;
363             @Override
364             public boolean processMessage(Object msg) {
365                 if (scheduleWrite(msg, written == 0)) {
366                     written++;
367                     return true;
368                 }
369                 return false;
370             }
371 
372             int write(ChannelOutboundBuffer in) {
373                 written = 0;
374                 try {
375                     in.forEachFlushedMessage(this);
376                 } catch (Exception e) {
377                     // This should never happen as our processMessage(...) never throws.
378                     throw new IllegalStateException(e);
379                 }
380                 return written;
381             }
382         }
383 
384         @Override
385         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
386             assert outstanding != -1 : "multi-shot not implemented yet";
387 
388             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
389             final ChannelPipeline pipeline = pipeline();
390             ByteBuf byteBuf = this.readBuffer;
391             assert byteBuf != null;
392             try {
393                 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
394             } catch (Throwable t) {
395                 Throwable e = (connected && t instanceof NativeIoException) ?
396                   translateForConnected((NativeIoException) t) : t;
397                 pipeline.fireExceptionCaught(e);
398             }
399         }
400 
401         private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
402                                       ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
403                 throws IOException {
404             MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
405             if (res < 0) {
406                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
407                     // If res is negative we should pass it to ioResult(...) which will either throw
408                     // or convert it to 0 if we could not read because the socket was not readable.
409                     allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
410                 }
411             } else {
412                 allocHandle.lastBytesRead(res);
413                 if (hdr.hasPort(IoUringDatagramChannel.this)) {
414                     allocHandle.incMessagesRead(1);
415                     DatagramPacket packet = hdr.get(
416                             IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
417                     pipeline.fireChannelRead(packet);
418                 }
419             }
420 
421             // Reset the id as this read was completed and so don't need to be cancelled later.
422             recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
423             if (outstanding == 0) {
424                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
425                 // another one or if the user will do it.
426                 this.readBuffer.release();
427                 this.readBuffer = null;
428                 recvmsgHdrs.clear();
429 
430                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
431                     if (allocHandle.lastBytesRead() > 0 &&
432                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
433                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
434                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
435                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
436                             // be able to read any data. This is useless work and we can skip it.
437                             (!IoUring.isCqeFSockNonEmptySupported() ||
438                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
439                         // Let's schedule another read.
440                         scheduleRead(false);
441                     } else {
442                         // the read was completed with EAGAIN.
443                         allocHandle.readComplete();
444                         pipeline.fireChannelReadComplete();
445                     }
446                 }
447             }
448         }
449 
450         @Override
451         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
452             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
453             ByteBuf byteBuf = allocHandle.allocate(alloc());
454             assert readBuffer == null;
455             readBuffer = byteBuf;
456 
457             int writable = byteBuf.writableBytes();
458             allocHandle.attemptedBytesRead(writable);
459             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
460 
461             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
462 
463             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
464             if (scheduled == 0) {
465                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
466                 // completion event.
467                 readBuffer = null;
468                 byteBuf.release();
469             }
470             return scheduled;
471         }
472 
473         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
474             int writable = byteBuf.writableBytes();
475             long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
476             if (numDatagram <= 1) {
477                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
478             }
479             int i = 0;
480             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
481             for (; i < numDatagram && writable >= datagramSize; i++) {
482                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
483                     break;
484                 }
485                 bufferAddress += datagramSize;
486                 writable -= datagramSize;
487             }
488             return i;
489         }
490 
491         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
492             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
493             if (msgHdrMemory == null) {
494                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
495                 return false;
496             }
497             msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
498 
499             int fd = fd().intValue();
500             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
501             IoRegistration registration = registration();
502             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
503             // readComplete0(...)
504             IoUringIoOps ops = IoUringIoOps.newRecvmsg(
505                     fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
506             long id = registration.submit(ops);
507             if (id == 0) {
508                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
509                 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
510                 return false;
511             }
512             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
513             return true;
514         }
515 
516         @Override
517         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
518             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
519 
520             // Reset the id as this write was completed and so don't need to be cancelled later.
521             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
522             sendmsgResArray[data] = res;
523             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
524             if (outstanding == 0) {
525                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
526                 boolean writtenSomething = false;
527                 int numWritten = sendmsgHdrs.length();
528                 sendmsgHdrs.clear();
529                 for (int i = 0; i < numWritten; i++) {
530                     writtenSomething |= removeFromOutboundBuffer(
531                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
532                 }
533                 return writtenSomething;
534             }
535             return true;
536         }
537 
538         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
539             if (res >= 0) {
540                 // When using Datagram we should consider the message written as long as res is not negative.
541                 return outboundBuffer.remove();
542             }
543             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
544                 return false;
545             }
546             try {
547                 return ioResult(errormsg, res) != 0;
548             } catch (Throwable cause) {
549                 return outboundBuffer.remove(cause);
550             }
551         }
552 
553         @Override
554         void connectComplete(byte op, int res, int flags, short data) {
555             if (res >= 0) {
556                 connected = true;
557             }
558             super.connectComplete(op, res, flags, data);
559         }
560 
561         @Override
562         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
563             return writeProcessor.write(in);
564         }
565 
566         @Override
567         protected int scheduleWriteSingle(Object msg) {
568             return scheduleWrite(msg, true) ? 1 : 0;
569         }
570 
571         private boolean scheduleWrite(Object msg, boolean first) {
572             final ByteBuf data;
573             final InetSocketAddress remoteAddress;
574             final int segmentSize;
575             if (msg instanceof AddressedEnvelope) {
576                 @SuppressWarnings("unchecked")
577                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
578                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
579                 data = envelope.content();
580                 remoteAddress = envelope.recipient();
581                 if (msg instanceof SegmentedDatagramPacket) {
582                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
583                 } else {
584                     segmentSize = 0;
585                 }
586             } else {
587                 data = (ByteBuf) msg;
588                 remoteAddress = (InetSocketAddress) remoteAddress();
589                 segmentSize = 0;
590             }
591 
592             long bufferAddress = IoUring.memoryAddress(data);
593             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
594         }
595 
596         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
597                                         int bufferLength, int segmentSize, boolean first) {
598             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
599             if (hdr == null) {
600                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
601                 // before we can write again.
602                 return false;
603             }
604             hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
605 
606             int fd = fd().intValue();
607             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
608             IoRegistration registration = registration();
609             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
610             long id = registration.submit(ops);
611             if (id == 0) {
612                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
613                 sendmsgHdrs.restoreNextHdr(hdr);
614                 return false;
615             }
616             sendmsgHdrs.setId(hdr.idx(), id);
617             return true;
618         }
619 
620         @Override
621         protected void freeResourcesNow(IoRegistration reg) {
622             sendmsgHdrs.release();
623             recvmsgHdrs.release();
624             super.freeResourcesNow(reg);
625         }
626     }
627 
628     private static IOException translateForConnected(NativeIoException e) {
629         // We need to correctly translate connect errors to match NIO behaviour.
630         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
631             PortUnreachableException error = new PortUnreachableException(e.getMessage());
632             error.initCause(e);
633             return error;
634         }
635         return e;
636     }
637 
638     /**
639      * Returns {@code true} if the usage of {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported.
640      *
641      * @return {@code true} if supported, {@code false} otherwise.
642      */
643     public static boolean isSegmentedDatagramPacketSupported() {
644         return IoUring.isAvailable();
645     }
646 
647     @Override
648     protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
649         if (numOutstandingReads > 0) {
650             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
651             assert canceled == numOutstandingReads;
652         }
653     }
654 
655     @Override
656     protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
657         if (numOutstandingWrites > 0) {
658             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
659             assert canceled == numOutstandingWrites;
660         }
661     }
662 
663     private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
664         int cancelled = 0;
665         for (int idx = 0; idx < array.length(); idx++) {
666             long id = array.id(idx);
667             if (id == MsgHdrMemoryArray.NO_ID) {
668                 continue;
669             }
670             // Let's try to cancel outstanding op as these might be submitted and waiting for data
671             // (via fastpoll).
672             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
673             registration.submit(ops);
674             cancelled++;
675         }
676         return cancelled;
677     }
678 
679     @Override
680     protected boolean socketIsEmpty(int flags) {
681         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
682     }
683 
684     @Override
685     boolean isPollInFirst() {
686         return false;
687     }
688 }