1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.epoll;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.ChannelOutboundBuffer;
20 import io.netty5.channel.ChannelOutboundBuffer.MessageProcessor;
21 import io.netty5.channel.socket.DatagramPacket;
22 import io.netty5.channel.unix.IovArray;
23 import io.netty5.channel.unix.Limits;
24 import io.netty5.channel.unix.SegmentedDatagramPacket;
25 import io.netty5.util.internal.UnstableApi;
26
27 import java.net.Inet6Address;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.UnknownHostException;
31
32 import static io.netty5.channel.unix.Limits.UIO_MAX_IOV;
33 import static io.netty5.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address;
34
35
36
37
38 final class NativeDatagramPacketArray {
39
40
41 private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
42
43
44
45 private final IovArray iovArray = new IovArray();
46
47
48 private final byte[] ipv4Bytes = new byte[4];
49 private final MyMessageProcessor processor = new MyMessageProcessor();
50
51 private int count;
52
53 NativeDatagramPacketArray() {
54 for (int i = 0; i < packets.length; i++) {
55 packets[i] = new NativeDatagramPacket();
56 }
57 }
58
59 boolean addWritable(Buffer buf, int segmentLen, InetSocketAddress recipient) {
60 if (count == packets.length) {
61
62
63 return false;
64 }
65 if (buf.writableBytes() == 0) {
66 return true;
67 }
68 int iovArrayStart = iovArray.count();
69 if (iovArrayStart == Limits.IOV_MAX) {
70 return false;
71 }
72 return 0 != buf.forEachWritable(0, (index, component) -> {
73 int writableBytes = component.writableBytes();
74 int byteCount = segmentLen == 0? writableBytes : Math.min(writableBytes, segmentLen);
75 if (iovArray.process(component, byteCount)) {
76 NativeDatagramPacket p = packets[count];
77 p.init(iovArray.memoryAddress(iovArrayStart), iovArray.count() - iovArrayStart, segmentLen, recipient);
78 count++;
79 component.skipWritableBytes(byteCount);
80 return true;
81 }
82 return false;
83 });
84 }
85
86 boolean addReadable(Buffer buf, int segmentLen, InetSocketAddress recipient) {
87 if (count == packets.length) {
88
89
90 return false;
91 }
92 if (buf.readableBytes() == 0) {
93 return true;
94 }
95 int iovArrayStart = iovArray.count();
96 if (iovArrayStart == Limits.IOV_MAX) {
97 return false;
98 }
99 return 0 != buf.forEachReadable(0, (index, component) -> {
100 int writableBytes = component.readableBytes();
101 int byteCount = segmentLen == 0? writableBytes : Math.min(writableBytes, segmentLen);
102 if (iovArray.process(component, byteCount)) {
103 NativeDatagramPacket p = packets[count];
104 long packetAddr = iovArray.memoryAddress(iovArrayStart);
105 p.init(packetAddr, iovArray.count() - iovArrayStart, segmentLen, recipient);
106 count++;
107 component.skipReadableBytes(byteCount);
108 return true;
109 }
110 return false;
111 });
112 }
113
114 void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception {
115 processor.connected = connected;
116 processor.maxMessagesPerWrite = maxMessagesPerWrite;
117 buffer.forEachFlushedMessage(processor);
118 }
119
120
121
122
123 int count() {
124 return count;
125 }
126
127
128
129
130 NativeDatagramPacket[] packets() {
131 return packets;
132 }
133
134 void clear() {
135 count = 0;
136 iovArray.clear();
137 }
138
139 void release() {
140 iovArray.release();
141 }
142
143 private final class MyMessageProcessor implements MessageProcessor {
144 private boolean connected;
145 private int maxMessagesPerWrite;
146
147 @Override
148 public boolean processMessage(Object msg) {
149 final boolean added;
150 if (msg instanceof DatagramPacket) {
151 DatagramPacket packet = (DatagramPacket) msg;
152 Buffer buf = packet.content();
153 int segmentSize = 0;
154 if (packet instanceof SegmentedDatagramPacket) {
155 int seg = ((SegmentedDatagramPacket) packet).segmentSize();
156
157
158 if (buf.readableBytes() > seg) {
159 segmentSize = seg;
160 }
161 }
162 boolean addedAny = false;
163 while (buf.readableBytes() > 0 &&
164 addReadable(buf, segmentSize, (InetSocketAddress) packet.recipient())) {
165 addedAny = true;
166 }
167 added = addedAny;
168 } else if (msg instanceof Buffer && connected) {
169 Buffer buf = (Buffer) msg;
170 boolean addedAny = false;
171 while (buf.readableBytes() > 0 && addReadable(buf, 0, null)) {
172 addedAny = true;
173 }
174 added = addedAny;
175 } else {
176 added = false;
177 }
178 if (added) {
179 maxMessagesPerWrite--;
180 return maxMessagesPerWrite > 0;
181 }
182 return false;
183 }
184 }
185
186 private static InetSocketAddress newAddress(byte[] addr, int addrLen, int port, int scopeId, byte[] ipv4Bytes)
187 throws UnknownHostException {
188 final InetAddress address;
189 if (addrLen == ipv4Bytes.length) {
190 System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
191 address = InetAddress.getByAddress(ipv4Bytes);
192 } else {
193 address = Inet6Address.getByAddress(null, addr, scopeId);
194 }
195 return new InetSocketAddress(address, port);
196 }
197
198
199
200
201 @SuppressWarnings("unused")
202 @UnstableApi
203 public final class NativeDatagramPacket {
204
205
206
207
208
209 private long memoryAddress;
210 private int count;
211
212 private final byte[] senderAddr = new byte[16];
213 private int senderAddrLen;
214 private int senderScopeId;
215 private int senderPort;
216
217 private final byte[] recipientAddr = new byte[16];
218 private int recipientAddrLen;
219 private int recipientScopeId;
220 private int recipientPort;
221
222 private int segmentSize;
223
224 private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) {
225 this.memoryAddress = memoryAddress;
226 this.count = count;
227 this.segmentSize = segmentSize;
228
229 senderScopeId = 0;
230 senderPort = 0;
231 senderAddrLen = 0;
232
233 if (recipient == null) {
234 recipientScopeId = 0;
235 recipientPort = 0;
236 recipientAddrLen = 0;
237 } else {
238 InetAddress address = recipient.getAddress();
239 if (address instanceof Inet6Address) {
240 System.arraycopy(address.getAddress(), 0, recipientAddr, 0, recipientAddr.length);
241 recipientScopeId = ((Inet6Address) address).getScopeId();
242 } else {
243 copyIpv4MappedIpv6Address(address.getAddress(), recipientAddr);
244 recipientScopeId = 0;
245 }
246 recipientAddrLen = recipientAddr.length;
247 recipientPort = recipient.getPort();
248 }
249 }
250
251 DatagramPacket newDatagramPacket(Buffer buffer, InetSocketAddress recipient) throws UnknownHostException {
252 InetSocketAddress sender = newAddress(senderAddr, senderAddrLen, senderPort, senderScopeId, ipv4Bytes);
253 if (recipientAddrLen != 0) {
254 recipient = newAddress(recipientAddr, recipientAddrLen, recipientPort, recipientScopeId, ipv4Bytes);
255 }
256
257
258 if (segmentSize > 0) {
259 return new SegmentedDatagramPacket(buffer, segmentSize, recipient, sender);
260 }
261 return new DatagramPacket(buffer, recipient, sender);
262 }
263 }
264 }