1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramChannelConfig;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.InternetProtocolFamily;
29 import io.netty.channel.unix.DatagramSocketAddress;
30 import io.netty.channel.unix.Errors;
31 import io.netty.channel.unix.IovArray;
32 import io.netty.channel.unix.UnixChannelUtil;
33 import io.netty.util.UncheckedBooleanSupplier;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.UnstableApi;
37
38 import java.net.InetAddress;
39 import java.net.InetSocketAddress;
40 import java.net.NetworkInterface;
41 import java.net.PortUnreachableException;
42 import java.net.SocketAddress;
43 import java.net.SocketException;
44 import java.nio.ByteBuffer;
45
46 import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
47
48 @UnstableApi
49 public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
50 private static final String EXPECTED_TYPES =
51 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
52 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53 StringUtil.simpleClassName(ByteBuf.class) + ", " +
54 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
55 StringUtil.simpleClassName(ByteBuf.class) + ')';
56
57 private volatile boolean connected;
58 private final KQueueDatagramChannelConfig config;
59
60 public KQueueDatagramChannel() {
61 super(null, newSocketDgram(), false);
62 config = new KQueueDatagramChannelConfig(this);
63 }
64
65 public KQueueDatagramChannel(InternetProtocolFamily protocol) {
66 super(null, newSocketDgram(protocol), false);
67 config = new KQueueDatagramChannelConfig(this);
68 }
69
70 public KQueueDatagramChannel(int fd) {
71 this(new BsdSocket(fd), true);
72 }
73
74 KQueueDatagramChannel(BsdSocket socket, boolean active) {
75 super(null, socket, active);
76 config = new KQueueDatagramChannelConfig(this);
77 }
78
79 @Override
80 public InetSocketAddress remoteAddress() {
81 return (InetSocketAddress) super.remoteAddress();
82 }
83
84 @Override
85 public InetSocketAddress localAddress() {
86 return (InetSocketAddress) super.localAddress();
87 }
88
89 @Override
90 @SuppressWarnings("deprecation")
91 public boolean isActive() {
92 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
93 }
94
95 @Override
96 public boolean isConnected() {
97 return connected;
98 }
99
100 @Override
101 public ChannelFuture joinGroup(InetAddress multicastAddress) {
102 return joinGroup(multicastAddress, newPromise());
103 }
104
105 @Override
106 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
107 try {
108 NetworkInterface iface = config().getNetworkInterface();
109 if (iface == null) {
110 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
111 }
112 return joinGroup(multicastAddress, iface, null, promise);
113 } catch (SocketException e) {
114 promise.setFailure(e);
115 }
116 return promise;
117 }
118
119 @Override
120 public ChannelFuture joinGroup(
121 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
122 return joinGroup(multicastAddress, networkInterface, newPromise());
123 }
124
125 @Override
126 public ChannelFuture joinGroup(
127 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
128 ChannelPromise promise) {
129 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
130 }
131
132 @Override
133 public ChannelFuture joinGroup(
134 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
135 return joinGroup(multicastAddress, networkInterface, source, newPromise());
136 }
137
138 @Override
139 public ChannelFuture joinGroup(
140 final InetAddress multicastAddress, final NetworkInterface networkInterface,
141 final InetAddress source, final ChannelPromise promise) {
142
143 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
144 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
145
146 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
147 return promise;
148 }
149
150 @Override
151 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
152 return leaveGroup(multicastAddress, newPromise());
153 }
154
155 @Override
156 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
157 try {
158 return leaveGroup(
159 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
160 } catch (SocketException e) {
161 promise.setFailure(e);
162 }
163 return promise;
164 }
165
166 @Override
167 public ChannelFuture leaveGroup(
168 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
169 return leaveGroup(multicastAddress, networkInterface, newPromise());
170 }
171
172 @Override
173 public ChannelFuture leaveGroup(
174 InetSocketAddress multicastAddress,
175 NetworkInterface networkInterface, ChannelPromise promise) {
176 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
177 }
178
179 @Override
180 public ChannelFuture leaveGroup(
181 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
182 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
183 }
184
185 @Override
186 public ChannelFuture leaveGroup(
187 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
188 final ChannelPromise promise) {
189 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
190 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
191
192 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
193
194 return promise;
195 }
196
197 @Override
198 public ChannelFuture block(
199 InetAddress multicastAddress, NetworkInterface networkInterface,
200 InetAddress sourceToBlock) {
201 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
202 }
203
204 @Override
205 public ChannelFuture block(
206 final InetAddress multicastAddress, final NetworkInterface networkInterface,
207 final InetAddress sourceToBlock, final ChannelPromise promise) {
208 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
209 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
210 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
211 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
212 return promise;
213 }
214
215 @Override
216 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
217 return block(multicastAddress, sourceToBlock, newPromise());
218 }
219
220 @Override
221 public ChannelFuture block(
222 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
223 try {
224 return block(
225 multicastAddress,
226 NetworkInterface.getByInetAddress(localAddress().getAddress()),
227 sourceToBlock, promise);
228 } catch (Throwable e) {
229 promise.setFailure(e);
230 }
231 return promise;
232 }
233
234 @Override
235 protected AbstractKQueueUnsafe newUnsafe() {
236 return new KQueueDatagramChannelUnsafe();
237 }
238
239 @Override
240 protected void doBind(SocketAddress localAddress) throws Exception {
241 super.doBind(localAddress);
242 active = true;
243 }
244
245 @Override
246 protected boolean doWriteMessage(Object msg) throws Exception {
247 final ByteBuf data;
248 InetSocketAddress remoteAddress;
249 if (msg instanceof AddressedEnvelope) {
250 @SuppressWarnings("unchecked")
251 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
252 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
253 data = envelope.content();
254 remoteAddress = envelope.recipient();
255 } else {
256 data = (ByteBuf) msg;
257 remoteAddress = null;
258 }
259
260 final int dataLen = data.readableBytes();
261 if (dataLen == 0) {
262 return true;
263 }
264
265 final long writtenBytes;
266 if (data.hasMemoryAddress()) {
267 long memoryAddress = data.memoryAddress();
268 if (remoteAddress == null) {
269 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
270 } else {
271 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
272 remoteAddress.getAddress(), remoteAddress.getPort());
273 }
274 } else if (data.nioBufferCount() > 1) {
275 IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
276 array.add(data, data.readerIndex(), data.readableBytes());
277 int cnt = array.count();
278 assert cnt != 0;
279
280 if (remoteAddress == null) {
281 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
282 } else {
283 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
284 remoteAddress.getAddress(), remoteAddress.getPort());
285 }
286 } else {
287 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
288 if (remoteAddress == null) {
289 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
290 } else {
291 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
292 remoteAddress.getAddress(), remoteAddress.getPort());
293 }
294 }
295
296 return writtenBytes > 0;
297 }
298
299 @Override
300 protected Object filterOutboundMessage(Object msg) {
301 if (msg instanceof DatagramPacket) {
302 DatagramPacket packet = (DatagramPacket) msg;
303 ByteBuf content = packet.content();
304 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
305 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
306 }
307
308 if (msg instanceof ByteBuf) {
309 ByteBuf buf = (ByteBuf) msg;
310 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
311 }
312
313 if (msg instanceof AddressedEnvelope) {
314 @SuppressWarnings("unchecked")
315 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
316 if (e.content() instanceof ByteBuf &&
317 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
318
319 ByteBuf content = (ByteBuf) e.content();
320 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
321 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
322 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
323 }
324 }
325
326 throw new UnsupportedOperationException(
327 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
328 }
329
330 @Override
331 public KQueueDatagramChannelConfig config() {
332 return config;
333 }
334
335 @Override
336 protected void doDisconnect() throws Exception {
337 socket.disconnect();
338 connected = active = false;
339 resetCachedAddresses();
340 }
341
342 @Override
343 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
344 if (super.doConnect(remoteAddress, localAddress)) {
345 connected = true;
346 return true;
347 }
348 return false;
349 }
350
351 @Override
352 protected void doClose() throws Exception {
353 super.doClose();
354 connected = false;
355 }
356
357 final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
358
359 @Override
360 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
361 assert eventLoop().inEventLoop();
362 final DatagramChannelConfig config = config();
363 if (shouldBreakReadReady(config)) {
364 clearReadFilter0();
365 return;
366 }
367 final ChannelPipeline pipeline = pipeline();
368 final ByteBufAllocator allocator = config.getAllocator();
369 allocHandle.reset(config);
370 readReadyBefore();
371
372 Throwable exception = null;
373 try {
374 ByteBuf byteBuf = null;
375 try {
376 boolean connected = isConnected();
377 do {
378 byteBuf = allocHandle.allocate(allocator);
379 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
380
381 final DatagramPacket packet;
382 if (connected) {
383 try {
384 allocHandle.lastBytesRead(doReadBytes(byteBuf));
385 } catch (Errors.NativeIoException e) {
386
387 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
388 PortUnreachableException error = new PortUnreachableException(e.getMessage());
389 error.initCause(e);
390 throw error;
391 }
392 throw e;
393 }
394 if (allocHandle.lastBytesRead() <= 0) {
395
396 byteBuf.release();
397 byteBuf = null;
398 break;
399 }
400 packet = new DatagramPacket(byteBuf,
401 (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
402 } else {
403 final DatagramSocketAddress remoteAddress;
404 if (byteBuf.hasMemoryAddress()) {
405
406 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
407 byteBuf.capacity());
408 } else {
409 ByteBuffer nioData = byteBuf.internalNioBuffer(
410 byteBuf.writerIndex(), byteBuf.writableBytes());
411 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
412 }
413
414 if (remoteAddress == null) {
415 allocHandle.lastBytesRead(-1);
416 byteBuf.release();
417 byteBuf = null;
418 break;
419 }
420 InetSocketAddress localAddress = remoteAddress.localAddress();
421 if (localAddress == null) {
422 localAddress = (InetSocketAddress) localAddress();
423 }
424 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
425 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
426
427 packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
428 }
429
430 allocHandle.incMessagesRead(1);
431
432 readPending = false;
433 pipeline.fireChannelRead(packet);
434
435 byteBuf = null;
436
437
438
439 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
440 } catch (Throwable t) {
441 if (byteBuf != null) {
442 byteBuf.release();
443 }
444 exception = t;
445 }
446
447 allocHandle.readComplete();
448 pipeline.fireChannelReadComplete();
449
450 if (exception != null) {
451 pipeline.fireExceptionCaught(exception);
452 }
453 } finally {
454 readReadyFinally(config);
455 }
456 }
457 }
458 }