View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.epoll;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.ChannelOutboundBuffer;
20  import io.netty5.channel.ChannelOutboundBuffer.MessageProcessor;
21  import io.netty5.channel.socket.DatagramPacket;
22  import io.netty5.channel.unix.IovArray;
23  import io.netty5.channel.unix.Limits;
24  import io.netty5.channel.unix.SegmentedDatagramPacket;
25  import io.netty5.util.internal.UnstableApi;
26  
27  import java.net.Inet6Address;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.UnknownHostException;
31  
32  import static io.netty5.channel.unix.Limits.UIO_MAX_IOV;
33  import static io.netty5.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address;
34  
35  /**
36   * Support <a href="https://linux.die.net//man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
37   */
38  final class NativeDatagramPacketArray {
39  
40      // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
41      private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
42  
43      // We share one IovArray for all NativeDatagramPackets to reduce memory overhead. This will allow us to write
44      // up to IOV_MAX iovec across all messages in one sendmmsg(...) call.
45      private final IovArray iovArray = new IovArray();
46  
47      // temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it.
48      private final byte[] ipv4Bytes = new byte[4];
49      private final MyMessageProcessor processor = new MyMessageProcessor();
50  
51      private int count;
52  
53      NativeDatagramPacketArray() {
54          for (int i = 0; i < packets.length; i++) {
55              packets[i] = new NativeDatagramPacket();
56          }
57      }
58  
59      boolean addWritable(Buffer buf, int segmentLen, InetSocketAddress recipient) {
60          if (count == packets.length) {
61              // We already filled up to UIO_MAX_IOV messages. This is the max allowed per
62              // recvmmsg(...) / sendmmsg(...) call, we will try again later.
63              return false;
64          }
65          if (buf.writableBytes() == 0) {
66              return true;
67          }
68          int iovArrayStart = iovArray.count();
69          if (iovArrayStart == Limits.IOV_MAX) {
70              return false;
71          }
72          return 0 != buf.forEachWritable(0, (index, component) -> {
73              int writableBytes = component.writableBytes();
74              int byteCount = segmentLen == 0? writableBytes : Math.min(writableBytes, segmentLen);
75              if (iovArray.process(component, byteCount)) {
76                  NativeDatagramPacket p = packets[count];
77                  p.init(iovArray.memoryAddress(iovArrayStart), iovArray.count() - iovArrayStart, segmentLen, recipient);
78                  count++;
79                  component.skipWritableBytes(byteCount);
80                  return true;
81              }
82              return false;
83          });
84      }
85  
86      boolean addReadable(Buffer buf, int segmentLen, InetSocketAddress recipient) {
87          if (count == packets.length) {
88              // We already filled up to UIO_MAX_IOV messages. This is the max allowed per
89              // recvmmsg(...) / sendmmsg(...) call, we will try again later.
90              return false;
91          }
92          if (buf.readableBytes() == 0) {
93              return true;
94          }
95          int iovArrayStart = iovArray.count();
96          if (iovArrayStart == Limits.IOV_MAX) {
97              return false;
98          }
99          return 0 != buf.forEachReadable(0, (index, component) -> {
100             int writableBytes = component.readableBytes();
101             int byteCount = segmentLen == 0? writableBytes : Math.min(writableBytes, segmentLen);
102             if (iovArray.process(component, byteCount)) {
103                 NativeDatagramPacket p = packets[count];
104                 long packetAddr = iovArray.memoryAddress(iovArrayStart);
105                 p.init(packetAddr, iovArray.count() - iovArrayStart, segmentLen, recipient);
106                 count++;
107                 component.skipReadableBytes(byteCount);
108                 return true;
109             }
110             return false;
111         });
112     }
113 
114     void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception {
115         processor.connected = connected;
116         processor.maxMessagesPerWrite = maxMessagesPerWrite;
117         buffer.forEachFlushedMessage(processor);
118     }
119 
120     /**
121      * Returns the count
122      */
123     int count() {
124         return count;
125     }
126 
127     /**
128      * Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
129      */
130     NativeDatagramPacket[] packets() {
131         return packets;
132     }
133 
134     void clear() {
135         count = 0;
136         iovArray.clear();
137     }
138 
139     void release() {
140         iovArray.release();
141     }
142 
143     private final class MyMessageProcessor implements MessageProcessor {
144         private boolean connected;
145         private int maxMessagesPerWrite;
146 
147         @Override
148         public boolean processMessage(Object msg) {
149             final boolean added;
150             if (msg instanceof DatagramPacket) {
151                 DatagramPacket packet = (DatagramPacket) msg;
152                 Buffer buf = packet.content();
153                 int segmentSize = 0;
154                 if (packet instanceof SegmentedDatagramPacket) {
155                     int seg = ((SegmentedDatagramPacket) packet).segmentSize();
156                     // We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple
157                     // segments in the packet.
158                     if (buf.readableBytes() > seg) {
159                         segmentSize = seg;
160                     }
161                 }
162                 boolean addedAny = false;
163                 while (buf.readableBytes() > 0 &&
164                         addReadable(buf, segmentSize, (InetSocketAddress) packet.recipient())) {
165                     addedAny = true;
166                 }
167                 added = addedAny;
168             } else if (msg instanceof Buffer && connected) {
169                 Buffer buf = (Buffer) msg;
170                 boolean addedAny = false;
171                 while (buf.readableBytes() > 0 && addReadable(buf, 0, null)) {
172                     addedAny = true;
173                 }
174                 added = addedAny;
175             } else {
176                 added = false;
177             }
178             if (added) {
179                 maxMessagesPerWrite--;
180                 return maxMessagesPerWrite > 0;
181             }
182             return false;
183         }
184     }
185 
186     private static InetSocketAddress newAddress(byte[] addr, int addrLen, int port, int scopeId, byte[] ipv4Bytes)
187             throws UnknownHostException {
188         final InetAddress address;
189         if (addrLen == ipv4Bytes.length) {
190             System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
191             address = InetAddress.getByAddress(ipv4Bytes);
192         } else {
193             address = Inet6Address.getByAddress(null, addr, scopeId);
194         }
195         return new InetSocketAddress(address, port);
196     }
197 
198     /**
199      * Used to pass needed data to JNI.
200      */
201     @SuppressWarnings("unused")
202     @UnstableApi
203     public final class NativeDatagramPacket {
204 
205         // IMPORTANT: Most of the below variables are accessed via JNI. Be aware if you change any of these you also
206         // need to change these in the related .c file!
207 
208         // This is the actual struct iovec*
209         private long memoryAddress;
210         private int count;
211 
212         private final byte[] senderAddr = new byte[16];
213         private int senderAddrLen;
214         private int senderScopeId;
215         private int senderPort;
216 
217         private final byte[] recipientAddr = new byte[16];
218         private int recipientAddrLen;
219         private int recipientScopeId;
220         private int recipientPort;
221 
222         private int segmentSize;
223 
224         private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) {
225             this.memoryAddress = memoryAddress;
226             this.count = count;
227             this.segmentSize = segmentSize;
228 
229             senderScopeId = 0;
230             senderPort = 0;
231             senderAddrLen = 0;
232 
233             if (recipient == null) {
234                 recipientScopeId = 0;
235                 recipientPort = 0;
236                 recipientAddrLen = 0;
237             } else {
238                 InetAddress address = recipient.getAddress();
239                 if (address instanceof Inet6Address) {
240                     System.arraycopy(address.getAddress(), 0, recipientAddr, 0, recipientAddr.length);
241                     recipientScopeId = ((Inet6Address) address).getScopeId();
242                 } else {
243                     copyIpv4MappedIpv6Address(address.getAddress(), recipientAddr);
244                     recipientScopeId = 0;
245                 }
246                 recipientAddrLen = recipientAddr.length;
247                 recipientPort = recipient.getPort();
248             }
249         }
250 
251         DatagramPacket newDatagramPacket(Buffer buffer, InetSocketAddress recipient) throws UnknownHostException {
252             InetSocketAddress sender = newAddress(senderAddr, senderAddrLen, senderPort, senderScopeId, ipv4Bytes);
253             if (recipientAddrLen != 0) {
254                 recipient = newAddress(recipientAddr, recipientAddrLen, recipientPort, recipientScopeId, ipv4Bytes);
255             }
256 
257             // UDP_GRO
258             if (segmentSize > 0) {
259                 return new SegmentedDatagramPacket(buffer, segmentSize, recipient, sender);
260             }
261             return new DatagramPacket(buffer, recipient, sender);
262         }
263     }
264 }