View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.unix.Buffer;
26  import io.netty.channel.unix.Errors;
27  
28  import java.net.SocketAddress;
29  import java.nio.ByteBuffer;
30  
31  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
32  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
33  
34  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
35      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
36  
37      private final ByteBuffer acceptedAddressMemory;
38      private final ByteBuffer acceptedAddressLengthMemory;
39  
40      private final long acceptedAddressMemoryAddress;
41      private final long acceptedAddressLengthMemoryAddress;
42  
43      private long acceptId;
44  
45      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
46          super(null, socket, active);
47  
48          acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49          acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
50          acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
51          // Needs to be initialized to the size of acceptedAddressMemory.
52          // See https://man7.org/linux/man-pages/man2/accept.2.html
53          acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
54          acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
55      }
56  
57      @Override
58      public final ChannelMetadata metadata() {
59          return METADATA;
60      }
61  
62      @Override
63      protected final void doClose() throws Exception {
64          super.doClose();
65      }
66  
67      @Override
68      protected final AbstractUringUnsafe newUnsafe() {
69          return new UringServerChannelUnsafe();
70      }
71  
72      @Override
73      protected final void doWrite(ChannelOutboundBuffer in) {
74          throw new UnsupportedOperationException();
75      }
76  
77      @Override
78      protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
79          if (acceptId != 0) {
80              assert numOutstandingReads == 1;
81              int fd = fd().intValue();
82              IoUringIoOps ops = IoUringIoOps.newAsyncCancel(
83                      fd, (byte) 0, acceptId, Native.IORING_OP_ACCEPT);
84              registration.submit(ops);
85          } else {
86              assert numOutstandingReads == 0;
87          }
88      }
89  
90      @Override
91      protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
92          assert numOutstandingWrites == 0;
93      }
94  
95      abstract Channel newChildChannel(
96              int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
97  
98      private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
99  
100         @Override
101         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
102             throw new UnsupportedOperationException();
103         }
104 
105         @Override
106         protected int scheduleWriteSingle(Object msg) {
107             throw new UnsupportedOperationException();
108         }
109 
110         @Override
111         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
112             throw new UnsupportedOperationException();
113         }
114 
115         @Override
116         protected int scheduleRead0(boolean first) {
117             assert acceptId == 0;
118             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
119             allocHandle.attemptedBytesRead(1);
120 
121             int fd = fd().intValue();
122             IoUringIoRegistration registration = registration();
123             IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0,
124                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
125             acceptId = registration.submit(ops);
126             if (acceptId == 0) {
127                 return 0;
128             }
129             return 1;
130         }
131 
132         @Override
133         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
134             assert acceptId != 0;
135             acceptId = 0;
136             final IoUringRecvByteAllocatorHandle allocHandle =
137                     (IoUringRecvByteAllocatorHandle) unsafe()
138                             .recvBufAllocHandle();
139             final ChannelPipeline pipeline = pipeline();
140             allocHandle.lastBytesRead(res);
141 
142             if (res >= 0) {
143                 allocHandle.incMessagesRead(1);
144                 try {
145                     Channel channel = newChildChannel(
146                             res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
147                     pipeline.fireChannelRead(channel);
148                     if (allocHandle.continueReading()) {
149                         scheduleRead(false);
150                     } else {
151                         allocHandle.readComplete();
152                         pipeline.fireChannelReadComplete();
153                     }
154                 } catch (Throwable cause) {
155                     allocHandle.readComplete();
156                     pipeline.fireChannelReadComplete();
157                     pipeline.fireExceptionCaught(cause);
158                 }
159             } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
160                 allocHandle.readComplete();
161                 pipeline.fireChannelReadComplete();
162                 // Check if we did fail because there was nothing to accept atm.
163                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
164                     // Something bad happened. Convert to an exception.
165                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
166                 }
167             }
168         }
169 
170         @Override
171         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
172                             final ChannelPromise promise) {
173             promise.setFailure(new UnsupportedOperationException());
174         }
175 
176         @Override
177         protected void freeResourcesNow(IoUringIoRegistration reg) {
178             super.freeResourcesNow(reg);
179             Buffer.free(acceptedAddressMemory);
180             Buffer.free(acceptedAddressLengthMemory);
181         }
182     }
183 }
184