1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelException;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.nio.AbstractNioMessageChannel;
30 import io.netty.channel.socket.DatagramChannelConfig;
31 import io.netty.channel.socket.DatagramPacket;
32 import io.netty.channel.socket.InternetProtocolFamily;
33 import io.netty.util.UncheckedBooleanSupplier;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.SocketUtils;
36 import io.netty.util.internal.PlatformDependent;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.SuppressJava6Requirement;
39
40 import java.io.IOException;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.NetworkInterface;
44 import java.net.SocketAddress;
45 import java.net.SocketException;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.DatagramChannel;
48 import java.nio.channels.MembershipKey;
49 import java.nio.channels.SelectionKey;
50 import java.nio.channels.UnresolvedAddressException;
51 import java.nio.channels.spi.SelectorProvider;
52 import java.util.ArrayList;
53 import java.util.HashMap;
54 import java.util.Iterator;
55 import java.util.List;
56 import java.util.Map;
57
58
59
60
61
62
63
64
65 public final class NioDatagramChannel
66 extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
67
68 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
69 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
70 private static final String EXPECTED_TYPES =
71 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
72 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
73 StringUtil.simpleClassName(ByteBuf.class) + ", " +
74 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
75 StringUtil.simpleClassName(ByteBuf.class) + ')';
76
77 private final DatagramChannelConfig config;
78
79 private Map<InetAddress, List<MembershipKey>> memberships;
80
81 private static DatagramChannel newSocket(SelectorProvider provider) {
82 try {
83
84
85
86
87
88
89 return provider.openDatagramChannel();
90 } catch (IOException e) {
91 throw new ChannelException("Failed to open a socket.", e);
92 }
93 }
94
95 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
96 private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
97 if (ipFamily == null) {
98 return newSocket(provider);
99 }
100
101 checkJavaVersion();
102
103 try {
104 return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
105 } catch (IOException e) {
106 throw new ChannelException("Failed to open a socket.", e);
107 }
108 }
109
110 private static void checkJavaVersion() {
111 if (PlatformDependent.javaVersion() < 7) {
112 throw new UnsupportedOperationException("Only supported on java 7+.");
113 }
114 }
115
116
117
118
119 public NioDatagramChannel() {
120 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
121 }
122
123
124
125
126
127 public NioDatagramChannel(SelectorProvider provider) {
128 this(newSocket(provider));
129 }
130
131
132
133
134
135 public NioDatagramChannel(InternetProtocolFamily ipFamily) {
136 this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
137 }
138
139
140
141
142
143
144 public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
145 this(newSocket(provider, ipFamily));
146 }
147
148
149
150
151 public NioDatagramChannel(DatagramChannel socket) {
152 super(null, socket, SelectionKey.OP_READ);
153 config = new NioDatagramChannelConfig(this, socket);
154 }
155
156 @Override
157 public ChannelMetadata metadata() {
158 return METADATA;
159 }
160
161 @Override
162 public DatagramChannelConfig config() {
163 return config;
164 }
165
166 @Override
167 @SuppressWarnings("deprecation")
168 public boolean isActive() {
169 DatagramChannel ch = javaChannel();
170 return ch.isOpen() && (
171 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
172 || ch.socket().isBound());
173 }
174
175 @Override
176 public boolean isConnected() {
177 return javaChannel().isConnected();
178 }
179
180 @Override
181 protected DatagramChannel javaChannel() {
182 return (DatagramChannel) super.javaChannel();
183 }
184
185 @Override
186 protected SocketAddress localAddress0() {
187 return javaChannel().socket().getLocalSocketAddress();
188 }
189
190 @Override
191 protected SocketAddress remoteAddress0() {
192 return javaChannel().socket().getRemoteSocketAddress();
193 }
194
195 @Override
196 protected void doBind(SocketAddress localAddress) throws Exception {
197 doBind0(localAddress);
198 }
199
200 private void doBind0(SocketAddress localAddress) throws Exception {
201 if (PlatformDependent.javaVersion() >= 7) {
202 SocketUtils.bind(javaChannel(), localAddress);
203 } else {
204 javaChannel().socket().bind(localAddress);
205 }
206 }
207
208 @Override
209 protected boolean doConnect(SocketAddress remoteAddress,
210 SocketAddress localAddress) throws Exception {
211 if (localAddress != null) {
212 doBind0(localAddress);
213 }
214
215 boolean success = false;
216 try {
217 javaChannel().connect(remoteAddress);
218 success = true;
219 return true;
220 } finally {
221 if (!success) {
222 doClose();
223 }
224 }
225 }
226
227 @Override
228 protected void doFinishConnect() throws Exception {
229 throw new Error();
230 }
231
232 @Override
233 protected void doDisconnect() throws Exception {
234 javaChannel().disconnect();
235 }
236
237 @Override
238 protected void doClose() throws Exception {
239 javaChannel().close();
240 }
241
242 @Override
243 protected int doReadMessages(List<Object> buf) throws Exception {
244 DatagramChannel ch = javaChannel();
245 DatagramChannelConfig config = config();
246 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
247
248 ByteBuf data = allocHandle.allocate(config.getAllocator());
249 allocHandle.attemptedBytesRead(data.writableBytes());
250 boolean free = true;
251 try {
252 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
253 int pos = nioData.position();
254 InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
255 if (remoteAddress == null) {
256 return 0;
257 }
258
259 allocHandle.lastBytesRead(nioData.position() - pos);
260 buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
261 localAddress(), remoteAddress));
262 free = false;
263 return 1;
264 } catch (Throwable cause) {
265 PlatformDependent.throwException(cause);
266 return -1;
267 } finally {
268 if (free) {
269 data.release();
270 }
271 }
272 }
273
274 @Override
275 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
276 final SocketAddress remoteAddress;
277 final ByteBuf data;
278 if (msg instanceof AddressedEnvelope) {
279 @SuppressWarnings("unchecked")
280 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
281 remoteAddress = envelope.recipient();
282 data = envelope.content();
283 } else {
284 data = (ByteBuf) msg;
285 remoteAddress = null;
286 }
287
288 final int dataLen = data.readableBytes();
289 if (dataLen == 0) {
290 return true;
291 }
292
293 final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
294 : data.nioBuffer(data.readerIndex(), dataLen);
295 final int writtenBytes;
296 if (remoteAddress != null) {
297 writtenBytes = javaChannel().send(nioData, remoteAddress);
298 } else {
299 writtenBytes = javaChannel().write(nioData);
300 }
301 return writtenBytes > 0;
302 }
303
304 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
305 if (envelope.recipient() instanceof InetSocketAddress
306 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
307 throw new UnresolvedAddressException();
308 }
309 }
310
311 @Override
312 protected Object filterOutboundMessage(Object msg) {
313 if (msg instanceof DatagramPacket) {
314 DatagramPacket p = (DatagramPacket) msg;
315 checkUnresolved(p);
316 ByteBuf content = p.content();
317 if (isSingleDirectBuffer(content)) {
318 return p;
319 }
320 return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
321 }
322
323 if (msg instanceof ByteBuf) {
324 ByteBuf buf = (ByteBuf) msg;
325 if (isSingleDirectBuffer(buf)) {
326 return buf;
327 }
328 return newDirectBuffer(buf);
329 }
330
331 if (msg instanceof AddressedEnvelope) {
332 @SuppressWarnings("unchecked")
333 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
334 checkUnresolved(e);
335 if (e.content() instanceof ByteBuf) {
336 ByteBuf content = (ByteBuf) e.content();
337 if (isSingleDirectBuffer(content)) {
338 return e;
339 }
340 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
341 }
342 }
343
344 throw new UnsupportedOperationException(
345 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
346 }
347
348
349
350
351
352 private static boolean isSingleDirectBuffer(ByteBuf buf) {
353 return buf.isDirect() && buf.nioBufferCount() == 1;
354 }
355
356 @Override
357 protected boolean continueOnWriteError() {
358
359
360
361 return true;
362 }
363
364 @Override
365 public InetSocketAddress localAddress() {
366 return (InetSocketAddress) super.localAddress();
367 }
368
369 @Override
370 public InetSocketAddress remoteAddress() {
371 return (InetSocketAddress) super.remoteAddress();
372 }
373
374 @Override
375 public ChannelFuture joinGroup(InetAddress multicastAddress) {
376 return joinGroup(multicastAddress, newPromise());
377 }
378
379 @Override
380 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
381 try {
382 NetworkInterface iface = config.getNetworkInterface();
383 if (iface == null) {
384 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
385 }
386 return joinGroup(
387 multicastAddress, iface, null, promise);
388 } catch (SocketException e) {
389 promise.setFailure(e);
390 }
391 return promise;
392 }
393
394 @Override
395 public ChannelFuture joinGroup(
396 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
397 return joinGroup(multicastAddress, networkInterface, newPromise());
398 }
399
400 @Override
401 public ChannelFuture joinGroup(
402 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
403 ChannelPromise promise) {
404 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
405 }
406
407 @Override
408 public ChannelFuture joinGroup(
409 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
410 return joinGroup(multicastAddress, networkInterface, source, newPromise());
411 }
412
413 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
414 @Override
415 public ChannelFuture joinGroup(
416 InetAddress multicastAddress, NetworkInterface networkInterface,
417 InetAddress source, ChannelPromise promise) {
418
419 checkJavaVersion();
420
421 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
422 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
423
424 try {
425 MembershipKey key;
426 if (source == null) {
427 key = javaChannel().join(multicastAddress, networkInterface);
428 } else {
429 key = javaChannel().join(multicastAddress, networkInterface, source);
430 }
431
432 synchronized (this) {
433 List<MembershipKey> keys = null;
434 if (memberships == null) {
435 memberships = new HashMap<InetAddress, List<MembershipKey>>();
436 } else {
437 keys = memberships.get(multicastAddress);
438 }
439 if (keys == null) {
440 keys = new ArrayList<MembershipKey>();
441 memberships.put(multicastAddress, keys);
442 }
443 keys.add(key);
444 }
445
446 promise.setSuccess();
447 } catch (Throwable e) {
448 promise.setFailure(e);
449 }
450
451 return promise;
452 }
453
454 @Override
455 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
456 return leaveGroup(multicastAddress, newPromise());
457 }
458
459 @Override
460 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
461 try {
462 return leaveGroup(
463 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
464 } catch (SocketException e) {
465 promise.setFailure(e);
466 }
467 return promise;
468 }
469
470 @Override
471 public ChannelFuture leaveGroup(
472 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
473 return leaveGroup(multicastAddress, networkInterface, newPromise());
474 }
475
476 @Override
477 public ChannelFuture leaveGroup(
478 InetSocketAddress multicastAddress,
479 NetworkInterface networkInterface, ChannelPromise promise) {
480 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
481 }
482
483 @Override
484 public ChannelFuture leaveGroup(
485 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
486 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
487 }
488
489 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
490 @Override
491 public ChannelFuture leaveGroup(
492 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
493 ChannelPromise promise) {
494 checkJavaVersion();
495
496 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
497 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
498
499 synchronized (this) {
500 if (memberships != null) {
501 List<MembershipKey> keys = memberships.get(multicastAddress);
502 if (keys != null) {
503 Iterator<MembershipKey> keyIt = keys.iterator();
504
505 while (keyIt.hasNext()) {
506 MembershipKey key = keyIt.next();
507 if (networkInterface.equals(key.networkInterface())) {
508 if (source == null && key.sourceAddress() == null ||
509 source != null && source.equals(key.sourceAddress())) {
510 key.drop();
511 keyIt.remove();
512 }
513 }
514 }
515 if (keys.isEmpty()) {
516 memberships.remove(multicastAddress);
517 }
518 }
519 }
520 }
521
522 promise.setSuccess();
523 return promise;
524 }
525
526
527
528
529 @Override
530 public ChannelFuture block(
531 InetAddress multicastAddress, NetworkInterface networkInterface,
532 InetAddress sourceToBlock) {
533 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
534 }
535
536
537
538
539 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
540 @Override
541 public ChannelFuture block(
542 InetAddress multicastAddress, NetworkInterface networkInterface,
543 InetAddress sourceToBlock, ChannelPromise promise) {
544 checkJavaVersion();
545
546 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
547 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
548 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
549
550 synchronized (this) {
551 if (memberships != null) {
552 List<MembershipKey> keys = memberships.get(multicastAddress);
553 for (MembershipKey key: keys) {
554 if (networkInterface.equals(key.networkInterface())) {
555 try {
556 key.block(sourceToBlock);
557 } catch (IOException e) {
558 promise.setFailure(e);
559 }
560 }
561 }
562 }
563 }
564 promise.setSuccess();
565 return promise;
566 }
567
568
569
570
571
572 @Override
573 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
574 return block(multicastAddress, sourceToBlock, newPromise());
575 }
576
577
578
579
580
581 @Override
582 public ChannelFuture block(
583 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
584 try {
585 return block(
586 multicastAddress,
587 NetworkInterface.getByInetAddress(localAddress().getAddress()),
588 sourceToBlock, promise);
589 } catch (SocketException e) {
590 promise.setFailure(e);
591 }
592 return promise;
593 }
594
595 @Override
596 @Deprecated
597 protected void setReadPending(boolean readPending) {
598 super.setReadPending(readPending);
599 }
600
601 void clearReadPending0() {
602 clearReadPending();
603 }
604
605 @Override
606 protected boolean closeOnReadError(Throwable cause) {
607
608
609 if (cause instanceof SocketException) {
610 return false;
611 }
612 return super.closeOnReadError(cause);
613 }
614
615 @Override
616 protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
617 if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
618
619
620 return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
621 .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
622 }
623 return allocHandle.continueReading();
624 }
625 }