View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.unix.Buffer;
26  import io.netty.channel.unix.Errors;
27  
28  import java.net.SocketAddress;
29  import java.nio.ByteBuffer;
30  
31  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
32  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
33  
34  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
35      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
36  
37      private final ByteBuffer acceptedAddressMemory;
38      private final ByteBuffer acceptedAddressLengthMemory;
39  
40      private final long acceptedAddressMemoryAddress;
41      private final long acceptedAddressLengthMemoryAddress;
42  
43      private long acceptId;
44  
45      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
46          super(null, socket, active);
47  
48          acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49          acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
50          acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
51          // Needs to be initialized to the size of acceptedAddressMemory.
52          // See https://man7.org/linux/man-pages/man2/accept.2.html
53          acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
54          acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
55      }
56  
57      @Override
58      public final ChannelMetadata metadata() {
59          return METADATA;
60      }
61  
62      @Override
63      protected final void doClose() throws Exception {
64          super.doClose();
65          Buffer.free(acceptedAddressMemory);
66          Buffer.free(acceptedAddressLengthMemory);
67      }
68  
69      @Override
70      protected final AbstractUringUnsafe newUnsafe() {
71          return new UringServerChannelUnsafe();
72      }
73  
74      @Override
75      protected final void doWrite(ChannelOutboundBuffer in) {
76          throw new UnsupportedOperationException();
77      }
78  
79      @Override
80      protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
81          if (acceptId != 0) {
82              assert numOutstandingReads == 1;
83              int fd = fd().intValue();
84              IoUringIoOps ops = IoUringIoOps.newAsyncCancel(
85                      fd, 0, acceptId, Native.IORING_OP_ACCEPT);
86              registration.submit(ops);
87          }
88          assert numOutstandingReads == 0;
89      }
90  
91      @Override
92      protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
93          assert numOutstandingWrites == 0;
94      }
95  
96      abstract Channel newChildChannel(
97              int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
98  
99      private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
100 
101         @Override
102         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
103             throw new UnsupportedOperationException();
104         }
105 
106         @Override
107         protected int scheduleWriteSingle(Object msg) {
108             throw new UnsupportedOperationException();
109         }
110 
111         @Override
112         boolean writeComplete0(int res, int flags, int data, int outstanding) {
113             throw new UnsupportedOperationException();
114         }
115 
116         @Override
117         protected int scheduleRead0(boolean first) {
118             assert acceptId == 0;
119             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
120             allocHandle.attemptedBytesRead(1);
121 
122             int fd = fd().intValue();
123             IoUringIoRegistration registration = registration();
124             IoUringIoOps ops = IoUringIoOps.newAccept(fd, 0, 0,
125                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
126             acceptId = registration.submit(ops);
127             return 1;
128         }
129 
130         @Override
131         protected void readComplete0(int res, int flags, int data, int outstanding) {
132             assert acceptId != 0;
133             acceptId = 0;
134             final IoUringRecvByteAllocatorHandle allocHandle =
135                     (IoUringRecvByteAllocatorHandle) unsafe()
136                             .recvBufAllocHandle();
137             final ChannelPipeline pipeline = pipeline();
138             allocHandle.lastBytesRead(res);
139 
140             if (res >= 0) {
141                 allocHandle.incMessagesRead(1);
142                 try {
143                     Channel channel = newChildChannel(
144                             res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
145                     pipeline.fireChannelRead(channel);
146                     if (allocHandle.continueReading()) {
147                         scheduleRead(false);
148                     } else {
149                         allocHandle.readComplete();
150                         pipeline.fireChannelReadComplete();
151                     }
152                 } catch (Throwable cause) {
153                     allocHandle.readComplete();
154                     pipeline.fireChannelReadComplete();
155                     pipeline.fireExceptionCaught(cause);
156                 }
157             } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
158                 allocHandle.readComplete();
159                 pipeline.fireChannelReadComplete();
160                 // Check if we did fail because there was nothing to accept atm.
161                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
162                     // Something bad happened. Convert to an exception.
163                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
164                 }
165             }
166         }
167 
168         @Override
169         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
170                             final ChannelPromise promise) {
171             promise.setFailure(new UnsupportedOperationException());
172         }
173     }
174 }
175