1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelOutboundBuffer;
20 import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
21 import io.netty.channel.socket.DatagramPacket;
22 import io.netty.channel.unix.IovArray;
23 import io.netty.channel.unix.Limits;
24 import io.netty.channel.unix.SegmentedDatagramPacket;
25 import io.netty.util.internal.UnstableApi;
26
27 import java.net.Inet6Address;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.UnknownHostException;
31
32 import static io.netty.channel.unix.Limits.UIO_MAX_IOV;
33 import static io.netty.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address;
34
35
36
37
38 final class NativeDatagramPacketArray {
39
40
41 private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
42
43
44
45 private final IovArray iovArray = new IovArray();
46
47
48 private final byte[] ipv4Bytes = new byte[4];
49 private final MyMessageProcessor processor = new MyMessageProcessor();
50
51 private int count;
52
53 NativeDatagramPacketArray() {
54 for (int i = 0; i < packets.length; i++) {
55 packets[i] = new NativeDatagramPacket();
56 }
57 }
58
59 boolean addWritable(ByteBuf buf, int index, int len) {
60 return add0(buf, index, len, 0, null);
61 }
62
63 private boolean add0(ByteBuf buf, int index, int len, int segmentLen, InetSocketAddress recipient) {
64 if (count == packets.length) {
65
66
67 return false;
68 }
69 if (len == 0) {
70 return true;
71 }
72 int offset = iovArray.count();
73 if (offset == Limits.IOV_MAX || !iovArray.add(buf, index, len)) {
74
75 return false;
76 }
77 NativeDatagramPacket p = packets[count];
78 p.init(iovArray.memoryAddress(offset), iovArray.count() - offset, segmentLen, recipient);
79
80 count++;
81 return true;
82 }
83
84 void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception {
85 processor.connected = connected;
86 processor.maxMessagesPerWrite = maxMessagesPerWrite;
87 buffer.forEachFlushedMessage(processor);
88 }
89
90
91
92
93 int count() {
94 return count;
95 }
96
97
98
99
100 NativeDatagramPacket[] packets() {
101 return packets;
102 }
103
104 void clear() {
105 this.count = 0;
106 this.iovArray.clear();
107 }
108
109 void release() {
110 iovArray.release();
111 }
112
113 private final class MyMessageProcessor implements MessageProcessor {
114 private boolean connected;
115 private int maxMessagesPerWrite;
116
117 @Override
118 public boolean processMessage(Object msg) {
119 final boolean added;
120 if (msg instanceof DatagramPacket) {
121 DatagramPacket packet = (DatagramPacket) msg;
122 ByteBuf buf = packet.content();
123 int segmentSize = 0;
124 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
125 int seg = ((io.netty.channel.unix.SegmentedDatagramPacket) packet).segmentSize();
126
127
128 if (buf.readableBytes() > seg) {
129 segmentSize = seg;
130 }
131 }
132 added = add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient());
133 } else if (msg instanceof ByteBuf && connected) {
134 ByteBuf buf = (ByteBuf) msg;
135 added = add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null);
136 } else {
137 added = false;
138 }
139 if (added) {
140 maxMessagesPerWrite--;
141 return maxMessagesPerWrite > 0;
142 }
143 return false;
144 }
145 }
146
147 private static InetSocketAddress newAddress(byte[] addr, int addrLen, int port, int scopeId, byte[] ipv4Bytes)
148 throws UnknownHostException {
149 final InetAddress address;
150 if (addrLen == ipv4Bytes.length) {
151 System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
152 address = InetAddress.getByAddress(ipv4Bytes);
153 } else {
154 address = Inet6Address.getByAddress(null, addr, scopeId);
155 }
156 return new InetSocketAddress(address, port);
157 }
158
159
160
161
162 @SuppressWarnings("unused")
163 @UnstableApi
164 public final class NativeDatagramPacket {
165
166
167
168
169
170 private long memoryAddress;
171 private int count;
172
173 private final byte[] senderAddr = new byte[16];
174 private int senderAddrLen;
175 private int senderScopeId;
176 private int senderPort;
177
178 private final byte[] recipientAddr = new byte[16];
179 private int recipientAddrLen;
180 private int recipientScopeId;
181 private int recipientPort;
182
183 private int segmentSize;
184
185 private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) {
186 this.memoryAddress = memoryAddress;
187 this.count = count;
188 this.segmentSize = segmentSize;
189
190 this.senderScopeId = 0;
191 this.senderPort = 0;
192 this.senderAddrLen = 0;
193
194 if (recipient == null) {
195 this.recipientScopeId = 0;
196 this.recipientPort = 0;
197 this.recipientAddrLen = 0;
198 } else {
199 InetAddress address = recipient.getAddress();
200 if (address instanceof Inet6Address) {
201 System.arraycopy(address.getAddress(), 0, recipientAddr, 0, recipientAddr.length);
202 recipientScopeId = ((Inet6Address) address).getScopeId();
203 } else {
204 copyIpv4MappedIpv6Address(address.getAddress(), recipientAddr);
205 recipientScopeId = 0;
206 }
207 recipientAddrLen = recipientAddr.length;
208 recipientPort = recipient.getPort();
209 }
210 }
211
212 boolean hasSender() {
213 return senderPort > 0;
214 }
215
216 DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException {
217 InetSocketAddress sender = newAddress(senderAddr, senderAddrLen, senderPort, senderScopeId, ipv4Bytes);
218 if (recipientAddrLen != 0) {
219 recipient = newAddress(recipientAddr, recipientAddrLen, recipientPort, recipientScopeId, ipv4Bytes);
220 }
221
222
223 ByteBuf slice = buffer.retainedSlice(buffer.readerIndex(), count);
224
225
226 if (segmentSize > 0) {
227 return new SegmentedDatagramPacket(slice, segmentSize, recipient, sender);
228 }
229 return new DatagramPacket(slice, recipient, sender);
230 }
231 }
232 }