1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelException;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.nio.AbstractNioMessageChannel;
30 import io.netty.channel.socket.DatagramChannelConfig;
31 import io.netty.channel.socket.DatagramPacket;
32 import io.netty.channel.socket.InternetProtocolFamily;
33 import io.netty.util.UncheckedBooleanSupplier;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.SocketUtils;
36 import io.netty.util.internal.PlatformDependent;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.SuppressJava6Requirement;
39
40 import java.io.IOException;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.NetworkInterface;
44 import java.net.SocketAddress;
45 import java.net.SocketException;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.DatagramChannel;
48 import java.nio.channels.MembershipKey;
49 import java.nio.channels.SelectionKey;
50 import java.nio.channels.spi.SelectorProvider;
51 import java.util.ArrayList;
52 import java.util.HashMap;
53 import java.util.Iterator;
54 import java.util.List;
55 import java.util.Map;
56
57
58
59
60
61
62
63
64 public final class NioDatagramChannel
65 extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
66
67 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
68 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
69 private static final String EXPECTED_TYPES =
70 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
71 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
72 StringUtil.simpleClassName(ByteBuf.class) + ", " +
73 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
74 StringUtil.simpleClassName(ByteBuf.class) + ')';
75
76 private final DatagramChannelConfig config;
77
78 private Map<InetAddress, List<MembershipKey>> memberships;
79
80 private static DatagramChannel newSocket(SelectorProvider provider) {
81 try {
82
83
84
85
86
87
88 return provider.openDatagramChannel();
89 } catch (IOException e) {
90 throw new ChannelException("Failed to open a socket.", e);
91 }
92 }
93
94 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
95 private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
96 if (ipFamily == null) {
97 return newSocket(provider);
98 }
99
100 checkJavaVersion();
101
102 try {
103 return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
104 } catch (IOException e) {
105 throw new ChannelException("Failed to open a socket.", e);
106 }
107 }
108
109 private static void checkJavaVersion() {
110 if (PlatformDependent.javaVersion() < 7) {
111 throw new UnsupportedOperationException("Only supported on java 7+.");
112 }
113 }
114
115
116
117
118 public NioDatagramChannel() {
119 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
120 }
121
122
123
124
125
126 public NioDatagramChannel(SelectorProvider provider) {
127 this(newSocket(provider));
128 }
129
130
131
132
133
134 public NioDatagramChannel(InternetProtocolFamily ipFamily) {
135 this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
136 }
137
138
139
140
141
142
143 public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
144 this(newSocket(provider, ipFamily));
145 }
146
147
148
149
150 public NioDatagramChannel(DatagramChannel socket) {
151 super(null, socket, SelectionKey.OP_READ);
152 config = new NioDatagramChannelConfig(this, socket);
153 }
154
155 @Override
156 public ChannelMetadata metadata() {
157 return METADATA;
158 }
159
160 @Override
161 public DatagramChannelConfig config() {
162 return config;
163 }
164
165 @Override
166 @SuppressWarnings("deprecation")
167 public boolean isActive() {
168 DatagramChannel ch = javaChannel();
169 return ch.isOpen() && (
170 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
171 || ch.socket().isBound());
172 }
173
174 @Override
175 public boolean isConnected() {
176 return javaChannel().isConnected();
177 }
178
179 @Override
180 protected DatagramChannel javaChannel() {
181 return (DatagramChannel) super.javaChannel();
182 }
183
184 @Override
185 protected SocketAddress localAddress0() {
186 return javaChannel().socket().getLocalSocketAddress();
187 }
188
189 @Override
190 protected SocketAddress remoteAddress0() {
191 return javaChannel().socket().getRemoteSocketAddress();
192 }
193
194 @Override
195 protected void doBind(SocketAddress localAddress) throws Exception {
196 doBind0(localAddress);
197 }
198
199 private void doBind0(SocketAddress localAddress) throws Exception {
200 if (PlatformDependent.javaVersion() >= 7) {
201 SocketUtils.bind(javaChannel(), localAddress);
202 } else {
203 javaChannel().socket().bind(localAddress);
204 }
205 }
206
207 @Override
208 protected boolean doConnect(SocketAddress remoteAddress,
209 SocketAddress localAddress) throws Exception {
210 if (localAddress != null) {
211 doBind0(localAddress);
212 }
213
214 boolean success = false;
215 try {
216 javaChannel().connect(remoteAddress);
217 success = true;
218 return true;
219 } finally {
220 if (!success) {
221 doClose();
222 }
223 }
224 }
225
226 @Override
227 protected void doFinishConnect() throws Exception {
228 throw new Error();
229 }
230
231 @Override
232 protected void doDisconnect() throws Exception {
233 javaChannel().disconnect();
234 }
235
236 @Override
237 protected void doClose() throws Exception {
238 javaChannel().close();
239 }
240
241 @Override
242 protected int doReadMessages(List<Object> buf) throws Exception {
243 DatagramChannel ch = javaChannel();
244 DatagramChannelConfig config = config();
245 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
246
247 ByteBuf data = allocHandle.allocate(config.getAllocator());
248 allocHandle.attemptedBytesRead(data.writableBytes());
249 boolean free = true;
250 try {
251 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
252 int pos = nioData.position();
253 InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
254 if (remoteAddress == null) {
255 return 0;
256 }
257
258 allocHandle.lastBytesRead(nioData.position() - pos);
259 buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
260 localAddress(), remoteAddress));
261 free = false;
262 return 1;
263 } catch (Throwable cause) {
264 PlatformDependent.throwException(cause);
265 return -1;
266 } finally {
267 if (free) {
268 data.release();
269 }
270 }
271 }
272
273 @Override
274 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
275 final SocketAddress remoteAddress;
276 final ByteBuf data;
277 if (msg instanceof AddressedEnvelope) {
278 @SuppressWarnings("unchecked")
279 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
280 remoteAddress = envelope.recipient();
281 data = envelope.content();
282 } else {
283 data = (ByteBuf) msg;
284 remoteAddress = null;
285 }
286
287 final int dataLen = data.readableBytes();
288 if (dataLen == 0) {
289 return true;
290 }
291
292 final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
293 : data.nioBuffer(data.readerIndex(), dataLen);
294 final int writtenBytes;
295 if (remoteAddress != null) {
296 writtenBytes = javaChannel().send(nioData, remoteAddress);
297 } else {
298 writtenBytes = javaChannel().write(nioData);
299 }
300 return writtenBytes > 0;
301 }
302
303 @Override
304 protected Object filterOutboundMessage(Object msg) {
305 if (msg instanceof DatagramPacket) {
306 DatagramPacket p = (DatagramPacket) msg;
307 ByteBuf content = p.content();
308 if (isSingleDirectBuffer(content)) {
309 return p;
310 }
311 return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
312 }
313
314 if (msg instanceof ByteBuf) {
315 ByteBuf buf = (ByteBuf) msg;
316 if (isSingleDirectBuffer(buf)) {
317 return buf;
318 }
319 return newDirectBuffer(buf);
320 }
321
322 if (msg instanceof AddressedEnvelope) {
323 @SuppressWarnings("unchecked")
324 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
325 if (e.content() instanceof ByteBuf) {
326 ByteBuf content = (ByteBuf) e.content();
327 if (isSingleDirectBuffer(content)) {
328 return e;
329 }
330 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
331 }
332 }
333
334 throw new UnsupportedOperationException(
335 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
336 }
337
338
339
340
341
342 private static boolean isSingleDirectBuffer(ByteBuf buf) {
343 return buf.isDirect() && buf.nioBufferCount() == 1;
344 }
345
346 @Override
347 protected boolean continueOnWriteError() {
348
349
350
351 return true;
352 }
353
354 @Override
355 public InetSocketAddress localAddress() {
356 return (InetSocketAddress) super.localAddress();
357 }
358
359 @Override
360 public InetSocketAddress remoteAddress() {
361 return (InetSocketAddress) super.remoteAddress();
362 }
363
364 @Override
365 public ChannelFuture joinGroup(InetAddress multicastAddress) {
366 return joinGroup(multicastAddress, newPromise());
367 }
368
369 @Override
370 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
371 try {
372 NetworkInterface iface = config.getNetworkInterface();
373 if (iface == null) {
374 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
375 }
376 return joinGroup(
377 multicastAddress, iface, null, promise);
378 } catch (SocketException e) {
379 promise.setFailure(e);
380 }
381 return promise;
382 }
383
384 @Override
385 public ChannelFuture joinGroup(
386 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
387 return joinGroup(multicastAddress, networkInterface, newPromise());
388 }
389
390 @Override
391 public ChannelFuture joinGroup(
392 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
393 ChannelPromise promise) {
394 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
395 }
396
397 @Override
398 public ChannelFuture joinGroup(
399 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
400 return joinGroup(multicastAddress, networkInterface, source, newPromise());
401 }
402
403 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
404 @Override
405 public ChannelFuture joinGroup(
406 InetAddress multicastAddress, NetworkInterface networkInterface,
407 InetAddress source, ChannelPromise promise) {
408
409 checkJavaVersion();
410
411 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
412 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
413
414 try {
415 MembershipKey key;
416 if (source == null) {
417 key = javaChannel().join(multicastAddress, networkInterface);
418 } else {
419 key = javaChannel().join(multicastAddress, networkInterface, source);
420 }
421
422 synchronized (this) {
423 List<MembershipKey> keys = null;
424 if (memberships == null) {
425 memberships = new HashMap<InetAddress, List<MembershipKey>>();
426 } else {
427 keys = memberships.get(multicastAddress);
428 }
429 if (keys == null) {
430 keys = new ArrayList<MembershipKey>();
431 memberships.put(multicastAddress, keys);
432 }
433 keys.add(key);
434 }
435
436 promise.setSuccess();
437 } catch (Throwable e) {
438 promise.setFailure(e);
439 }
440
441 return promise;
442 }
443
444 @Override
445 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
446 return leaveGroup(multicastAddress, newPromise());
447 }
448
449 @Override
450 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
451 try {
452 return leaveGroup(
453 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
454 } catch (SocketException e) {
455 promise.setFailure(e);
456 }
457 return promise;
458 }
459
460 @Override
461 public ChannelFuture leaveGroup(
462 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
463 return leaveGroup(multicastAddress, networkInterface, newPromise());
464 }
465
466 @Override
467 public ChannelFuture leaveGroup(
468 InetSocketAddress multicastAddress,
469 NetworkInterface networkInterface, ChannelPromise promise) {
470 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
471 }
472
473 @Override
474 public ChannelFuture leaveGroup(
475 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
476 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
477 }
478
479 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
480 @Override
481 public ChannelFuture leaveGroup(
482 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
483 ChannelPromise promise) {
484 checkJavaVersion();
485
486 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
487 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
488
489 synchronized (this) {
490 if (memberships != null) {
491 List<MembershipKey> keys = memberships.get(multicastAddress);
492 if (keys != null) {
493 Iterator<MembershipKey> keyIt = keys.iterator();
494
495 while (keyIt.hasNext()) {
496 MembershipKey key = keyIt.next();
497 if (networkInterface.equals(key.networkInterface())) {
498 if (source == null && key.sourceAddress() == null ||
499 source != null && source.equals(key.sourceAddress())) {
500 key.drop();
501 keyIt.remove();
502 }
503 }
504 }
505 if (keys.isEmpty()) {
506 memberships.remove(multicastAddress);
507 }
508 }
509 }
510 }
511
512 promise.setSuccess();
513 return promise;
514 }
515
516
517
518
519 @Override
520 public ChannelFuture block(
521 InetAddress multicastAddress, NetworkInterface networkInterface,
522 InetAddress sourceToBlock) {
523 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
524 }
525
526
527
528
529 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
530 @Override
531 public ChannelFuture block(
532 InetAddress multicastAddress, NetworkInterface networkInterface,
533 InetAddress sourceToBlock, ChannelPromise promise) {
534 checkJavaVersion();
535
536 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
537 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
538 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
539
540 synchronized (this) {
541 if (memberships != null) {
542 List<MembershipKey> keys = memberships.get(multicastAddress);
543 for (MembershipKey key: keys) {
544 if (networkInterface.equals(key.networkInterface())) {
545 try {
546 key.block(sourceToBlock);
547 } catch (IOException e) {
548 promise.setFailure(e);
549 }
550 }
551 }
552 }
553 }
554 promise.setSuccess();
555 return promise;
556 }
557
558
559
560
561
562 @Override
563 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
564 return block(multicastAddress, sourceToBlock, newPromise());
565 }
566
567
568
569
570
571 @Override
572 public ChannelFuture block(
573 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
574 try {
575 return block(
576 multicastAddress,
577 NetworkInterface.getByInetAddress(localAddress().getAddress()),
578 sourceToBlock, promise);
579 } catch (SocketException e) {
580 promise.setFailure(e);
581 }
582 return promise;
583 }
584
585 @Override
586 @Deprecated
587 protected void setReadPending(boolean readPending) {
588 super.setReadPending(readPending);
589 }
590
591 void clearReadPending0() {
592 clearReadPending();
593 }
594
595 @Override
596 protected boolean closeOnReadError(Throwable cause) {
597
598
599 if (cause instanceof SocketException) {
600 return false;
601 }
602 return super.closeOnReadError(cause);
603 }
604
605 @Override
606 protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
607 if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
608
609
610 return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
611 .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
612 }
613 return allocHandle.continueReading();
614 }
615 }