1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelException;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.nio.AbstractNioMessageChannel;
30 import io.netty.channel.socket.DatagramChannelConfig;
31 import io.netty.channel.socket.DatagramPacket;
32 import io.netty.channel.socket.InternetProtocolFamily;
33 import io.netty.channel.socket.SocketProtocolFamily;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.ObjectUtil;
36 import io.netty.util.internal.PlatformDependent;
37 import io.netty.util.internal.SocketUtils;
38 import io.netty.util.internal.StringUtil;
39
40 import java.io.IOException;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.NetworkInterface;
44 import java.net.SocketAddress;
45 import java.net.SocketException;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.DatagramChannel;
48 import java.nio.channels.MembershipKey;
49 import java.nio.channels.SelectionKey;
50 import java.nio.channels.UnresolvedAddressException;
51 import java.nio.channels.spi.SelectorProvider;
52 import java.util.ArrayList;
53 import java.util.HashMap;
54 import java.util.Iterator;
55 import java.util.List;
56 import java.util.Map;
57
58
59
60
61
62
63
64
65 public final class NioDatagramChannel
66 extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
67
68 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
69 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
70 private static final String EXPECTED_TYPES =
71 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
72 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
73 StringUtil.simpleClassName(ByteBuf.class) + ", " +
74 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
75 StringUtil.simpleClassName(ByteBuf.class) + ')';
76
77 private final DatagramChannelConfig config;
78
79 private Map<InetAddress, List<MembershipKey>> memberships;
80
81
82
83
84
85
86
87 private static DatagramChannel newSocket(SelectorProvider provider) {
88 try {
89 return provider.openDatagramChannel();
90 } catch (IOException e) {
91 throw new ChannelException("Failed to open a socket.", e);
92 }
93 }
94
95 private static DatagramChannel newSocket(SelectorProvider provider, SocketProtocolFamily ipFamily) {
96 if (ipFamily == null) {
97 return newSocket(provider);
98 }
99
100 try {
101 return provider.openDatagramChannel(ipFamily.toJdkFamily());
102 } catch (IOException e) {
103 throw new ChannelException("Failed to open a socket.", e);
104 }
105 }
106
107
108
109
110 public NioDatagramChannel() {
111 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
112 }
113
114
115
116
117
118 public NioDatagramChannel(SelectorProvider provider) {
119 this(newSocket(provider));
120 }
121
122
123
124
125
126
127
128 @Deprecated
129 public NioDatagramChannel(InternetProtocolFamily ipFamily) {
130 this(ipFamily == null ? null : ipFamily.toSocketProtocolFamily());
131 }
132
133
134
135
136
137 public NioDatagramChannel(SocketProtocolFamily protocolFamily) {
138 this(newSocket(DEFAULT_SELECTOR_PROVIDER, protocolFamily));
139 }
140
141
142
143
144
145
146
147
148 @Deprecated
149 public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
150 this(provider, ipFamily == null ? null : ipFamily.toSocketProtocolFamily());
151 }
152
153
154
155
156
157
158 public NioDatagramChannel(SelectorProvider provider, SocketProtocolFamily protocolFamily) {
159 this(newSocket(provider, protocolFamily));
160 }
161
162
163
164
165 public NioDatagramChannel(DatagramChannel socket) {
166 super(null, socket, SelectionKey.OP_READ);
167 config = new NioDatagramChannelConfig(this, socket);
168 }
169
170 @Override
171 public ChannelMetadata metadata() {
172 return METADATA;
173 }
174
175 @Override
176 public DatagramChannelConfig config() {
177 return config;
178 }
179
180 @Override
181 @SuppressWarnings("deprecation")
182 public boolean isActive() {
183 DatagramChannel ch = javaChannel();
184 return ch.isOpen() && (
185 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
186 || ch.socket().isBound());
187 }
188
189 @Override
190 public boolean isConnected() {
191 return javaChannel().isConnected();
192 }
193
194 @Override
195 protected DatagramChannel javaChannel() {
196 return (DatagramChannel) super.javaChannel();
197 }
198
199 @Override
200 protected SocketAddress localAddress0() {
201 return javaChannel().socket().getLocalSocketAddress();
202 }
203
204 @Override
205 protected SocketAddress remoteAddress0() {
206 return javaChannel().socket().getRemoteSocketAddress();
207 }
208
209 @Override
210 protected void doBind(SocketAddress localAddress) throws Exception {
211 doBind0(localAddress);
212 }
213
214 private void doBind0(SocketAddress localAddress) throws Exception {
215 SocketUtils.bind(javaChannel(), localAddress);
216 }
217
218 @Override
219 protected boolean doConnect(SocketAddress remoteAddress,
220 SocketAddress localAddress) throws Exception {
221 if (localAddress != null) {
222 doBind0(localAddress);
223 }
224
225 boolean success = false;
226 try {
227 javaChannel().connect(remoteAddress);
228 success = true;
229 return true;
230 } finally {
231 if (!success) {
232 doClose();
233 }
234 }
235 }
236
237 @Override
238 protected void doFinishConnect() throws Exception {
239 throw new Error();
240 }
241
242 @Override
243 protected void doDisconnect() throws Exception {
244 javaChannel().disconnect();
245 }
246
247 @Override
248 protected void doClose() throws Exception {
249 javaChannel().close();
250 }
251
252 @Override
253 protected int doReadMessages(List<Object> buf) throws Exception {
254 DatagramChannel ch = javaChannel();
255 DatagramChannelConfig config = config();
256 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
257
258 ByteBuf data = allocHandle.allocate(config.getAllocator());
259 allocHandle.attemptedBytesRead(data.writableBytes());
260 boolean free = true;
261 try {
262 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
263 int pos = nioData.position();
264 InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
265 if (remoteAddress == null) {
266 return 0;
267 }
268
269 allocHandle.lastBytesRead(nioData.position() - pos);
270 buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
271 localAddress(), remoteAddress));
272 free = false;
273 return 1;
274 } catch (Throwable cause) {
275 PlatformDependent.throwException(cause);
276 return -1;
277 } finally {
278 if (free) {
279 data.release();
280 }
281 }
282 }
283
284 @Override
285 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
286 final SocketAddress remoteAddress;
287 final ByteBuf data;
288 if (msg instanceof AddressedEnvelope) {
289 @SuppressWarnings("unchecked")
290 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
291 remoteAddress = envelope.recipient();
292 data = envelope.content();
293 } else {
294 data = (ByteBuf) msg;
295 remoteAddress = null;
296 }
297
298 final int dataLen = data.readableBytes();
299 if (dataLen == 0) {
300 return true;
301 }
302
303 final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
304 : data.nioBuffer(data.readerIndex(), dataLen);
305 final int writtenBytes;
306 if (remoteAddress != null) {
307 writtenBytes = javaChannel().send(nioData, remoteAddress);
308 } else {
309 writtenBytes = javaChannel().write(nioData);
310 }
311 return writtenBytes > 0;
312 }
313
314 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
315 if (envelope.recipient() instanceof InetSocketAddress
316 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
317 throw new UnresolvedAddressException();
318 }
319 }
320
321 @Override
322 protected Object filterOutboundMessage(Object msg) {
323 if (msg instanceof DatagramPacket) {
324 DatagramPacket p = (DatagramPacket) msg;
325 checkUnresolved(p);
326 ByteBuf content = p.content();
327 if (isSingleDirectBuffer(content)) {
328 return p;
329 }
330 return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
331 }
332
333 if (msg instanceof ByteBuf) {
334 ByteBuf buf = (ByteBuf) msg;
335 if (isSingleDirectBuffer(buf)) {
336 return buf;
337 }
338 return newDirectBuffer(buf);
339 }
340
341 if (msg instanceof AddressedEnvelope) {
342 @SuppressWarnings("unchecked")
343 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
344 checkUnresolved(e);
345 if (e.content() instanceof ByteBuf) {
346 ByteBuf content = (ByteBuf) e.content();
347 if (isSingleDirectBuffer(content)) {
348 return e;
349 }
350 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
351 }
352 }
353
354 throw new UnsupportedOperationException(
355 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
356 }
357
358
359
360
361
362 private static boolean isSingleDirectBuffer(ByteBuf buf) {
363 return buf.isDirect() && buf.nioBufferCount() == 1;
364 }
365
366 @Override
367 protected boolean continueOnWriteError() {
368
369
370
371 return true;
372 }
373
374 @Override
375 public InetSocketAddress localAddress() {
376 return (InetSocketAddress) super.localAddress();
377 }
378
379 @Override
380 public InetSocketAddress remoteAddress() {
381 return (InetSocketAddress) super.remoteAddress();
382 }
383
384 @Override
385 public ChannelFuture joinGroup(InetAddress multicastAddress) {
386 return joinGroup(multicastAddress, newPromise());
387 }
388
389 @Override
390 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
391 try {
392 NetworkInterface iface = config.getNetworkInterface();
393 if (iface == null) {
394 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
395 }
396 return joinGroup(
397 multicastAddress, iface, null, promise);
398 } catch (SocketException e) {
399 promise.setFailure(e);
400 }
401 return promise;
402 }
403
404 @Override
405 public ChannelFuture joinGroup(
406 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
407 return joinGroup(multicastAddress, networkInterface, newPromise());
408 }
409
410 @Override
411 public ChannelFuture joinGroup(
412 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
413 ChannelPromise promise) {
414 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
415 }
416
417 @Override
418 public ChannelFuture joinGroup(
419 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
420 return joinGroup(multicastAddress, networkInterface, source, newPromise());
421 }
422
423 @Override
424 public ChannelFuture joinGroup(
425 InetAddress multicastAddress, NetworkInterface networkInterface,
426 InetAddress source, ChannelPromise promise) {
427
428 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
429 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
430
431 try {
432 MembershipKey key;
433 if (source == null) {
434 key = javaChannel().join(multicastAddress, networkInterface);
435 } else {
436 key = javaChannel().join(multicastAddress, networkInterface, source);
437 }
438
439 synchronized (this) {
440 List<MembershipKey> keys = null;
441 if (memberships == null) {
442 memberships = new HashMap<InetAddress, List<MembershipKey>>();
443 } else {
444 keys = memberships.get(multicastAddress);
445 }
446 if (keys == null) {
447 keys = new ArrayList<MembershipKey>();
448 memberships.put(multicastAddress, keys);
449 }
450 keys.add(key);
451 }
452
453 promise.setSuccess();
454 } catch (Throwable e) {
455 promise.setFailure(e);
456 }
457
458 return promise;
459 }
460
461 @Override
462 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
463 return leaveGroup(multicastAddress, newPromise());
464 }
465
466 @Override
467 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
468 try {
469 return leaveGroup(
470 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
471 } catch (SocketException e) {
472 promise.setFailure(e);
473 }
474 return promise;
475 }
476
477 @Override
478 public ChannelFuture leaveGroup(
479 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
480 return leaveGroup(multicastAddress, networkInterface, newPromise());
481 }
482
483 @Override
484 public ChannelFuture leaveGroup(
485 InetSocketAddress multicastAddress,
486 NetworkInterface networkInterface, ChannelPromise promise) {
487 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
488 }
489
490 @Override
491 public ChannelFuture leaveGroup(
492 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
493 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
494 }
495
496 @Override
497 public ChannelFuture leaveGroup(
498 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
499 ChannelPromise promise) {
500
501 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
502 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
503
504 synchronized (this) {
505 if (memberships != null) {
506 List<MembershipKey> keys = memberships.get(multicastAddress);
507 if (keys != null) {
508 Iterator<MembershipKey> keyIt = keys.iterator();
509
510 while (keyIt.hasNext()) {
511 MembershipKey key = keyIt.next();
512 if (networkInterface.equals(key.networkInterface())) {
513 if (source == null && key.sourceAddress() == null ||
514 source != null && source.equals(key.sourceAddress())) {
515 key.drop();
516 keyIt.remove();
517 }
518 }
519 }
520 if (keys.isEmpty()) {
521 memberships.remove(multicastAddress);
522 }
523 }
524 }
525 }
526
527 promise.setSuccess();
528 return promise;
529 }
530
531
532
533
534 @Override
535 public ChannelFuture block(
536 InetAddress multicastAddress, NetworkInterface networkInterface,
537 InetAddress sourceToBlock) {
538 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
539 }
540
541
542
543
544 @Override
545 public ChannelFuture block(
546 InetAddress multicastAddress, NetworkInterface networkInterface,
547 InetAddress sourceToBlock, ChannelPromise promise) {
548
549 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
550 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
551 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
552
553 synchronized (this) {
554 if (memberships != null) {
555 List<MembershipKey> keys = memberships.get(multicastAddress);
556 for (MembershipKey key: keys) {
557 if (networkInterface.equals(key.networkInterface())) {
558 try {
559 key.block(sourceToBlock);
560 } catch (IOException e) {
561 promise.setFailure(e);
562 }
563 }
564 }
565 }
566 }
567 promise.setSuccess();
568 return promise;
569 }
570
571
572
573
574
575 @Override
576 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
577 return block(multicastAddress, sourceToBlock, newPromise());
578 }
579
580
581
582
583
584 @Override
585 public ChannelFuture block(
586 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
587 try {
588 return block(
589 multicastAddress,
590 NetworkInterface.getByInetAddress(localAddress().getAddress()),
591 sourceToBlock, promise);
592 } catch (SocketException e) {
593 promise.setFailure(e);
594 }
595 return promise;
596 }
597
598 @Override
599 @Deprecated
600 protected void setReadPending(boolean readPending) {
601 super.setReadPending(readPending);
602 }
603
604 void clearReadPending0() {
605 clearReadPending();
606 }
607
608 @Override
609 protected boolean closeOnReadError(Throwable cause) {
610
611
612 if (cause instanceof SocketException) {
613 return false;
614 }
615 return super.closeOnReadError(cause);
616 }
617
618 @Override
619 protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
620 if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
621
622
623 return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
624 .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
625 }
626 return allocHandle.continueReading();
627 }
628 }