1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelException;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.nio.AbstractNioMessageChannel;
30 import io.netty.channel.socket.DatagramChannelConfig;
31 import io.netty.channel.socket.DatagramPacket;
32 import io.netty.channel.socket.InternetProtocolFamily;
33 import io.netty.util.internal.SocketUtils;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.UnstableApi;
37
38 import java.io.IOException;
39 import java.net.InetAddress;
40 import java.net.InetSocketAddress;
41 import java.net.NetworkInterface;
42 import java.net.SocketAddress;
43 import java.net.SocketException;
44 import java.nio.ByteBuffer;
45 import java.nio.channels.DatagramChannel;
46 import java.nio.channels.MembershipKey;
47 import java.nio.channels.SelectionKey;
48 import java.nio.channels.spi.SelectorProvider;
49 import java.util.ArrayList;
50 import java.util.HashMap;
51 import java.util.Iterator;
52 import java.util.List;
53 import java.util.Map;
54
55
56
57
58
59
60
61
62 public final class NioDatagramChannel
63 extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
64
65 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
66 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
67 private static final String EXPECTED_TYPES =
68 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
69 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
70 StringUtil.simpleClassName(ByteBuf.class) + ", " +
71 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
72 StringUtil.simpleClassName(ByteBuf.class) + ')';
73
74 private final DatagramChannelConfig config;
75
76 private Map<InetAddress, List<MembershipKey>> memberships;
77 private RecvByteBufAllocator.Handle allocHandle;
78
79 private static DatagramChannel newSocket(SelectorProvider provider) {
80 try {
81
82
83
84
85
86
87 return provider.openDatagramChannel();
88 } catch (IOException e) {
89 throw new ChannelException("Failed to open a socket.", e);
90 }
91 }
92
93 private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
94 if (ipFamily == null) {
95 return newSocket(provider);
96 }
97
98 checkJavaVersion();
99
100 try {
101 return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
102 } catch (IOException e) {
103 throw new ChannelException("Failed to open a socket.", e);
104 }
105 }
106
107 private static void checkJavaVersion() {
108 if (PlatformDependent.javaVersion() < 7) {
109 throw new UnsupportedOperationException("Only supported on java 7+.");
110 }
111 }
112
113
114
115
116 public NioDatagramChannel() {
117 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
118 }
119
120
121
122
123
124 public NioDatagramChannel(SelectorProvider provider) {
125 this(newSocket(provider));
126 }
127
128
129
130
131
132 public NioDatagramChannel(InternetProtocolFamily ipFamily) {
133 this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
134 }
135
136
137
138
139
140
141 public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
142 this(newSocket(provider, ipFamily));
143 }
144
145
146
147
148 public NioDatagramChannel(DatagramChannel socket) {
149 super(null, socket, SelectionKey.OP_READ);
150 config = new NioDatagramChannelConfig(this, socket);
151 }
152
153 @Override
154 public ChannelMetadata metadata() {
155 return METADATA;
156 }
157
158 @Override
159 public DatagramChannelConfig config() {
160 return config;
161 }
162
163 @Override
164 @SuppressWarnings("deprecation")
165 public boolean isActive() {
166 DatagramChannel ch = javaChannel();
167 return ch.isOpen() && (
168 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
169 || ch.socket().isBound());
170 }
171
172 @Override
173 public boolean isConnected() {
174 return javaChannel().isConnected();
175 }
176
177 @Override
178 protected DatagramChannel javaChannel() {
179 return (DatagramChannel) super.javaChannel();
180 }
181
182 @Override
183 protected SocketAddress localAddress0() {
184 return javaChannel().socket().getLocalSocketAddress();
185 }
186
187 @Override
188 protected SocketAddress remoteAddress0() {
189 return javaChannel().socket().getRemoteSocketAddress();
190 }
191
192 @Override
193 protected void doBind(SocketAddress localAddress) throws Exception {
194 doBind0(localAddress);
195 }
196
197 private void doBind0(SocketAddress localAddress) throws Exception {
198 if (PlatformDependent.javaVersion() >= 7) {
199 SocketUtils.bind(javaChannel(), localAddress);
200 } else {
201 javaChannel().socket().bind(localAddress);
202 }
203 }
204
205 @Override
206 protected boolean doConnect(SocketAddress remoteAddress,
207 SocketAddress localAddress) throws Exception {
208 if (localAddress != null) {
209 doBind0(localAddress);
210 }
211
212 boolean success = false;
213 try {
214 javaChannel().connect(remoteAddress);
215 success = true;
216 return true;
217 } finally {
218 if (!success) {
219 doClose();
220 }
221 }
222 }
223
224 @Override
225 protected void doFinishConnect() throws Exception {
226 throw new Error();
227 }
228
229 @Override
230 protected void doDisconnect() throws Exception {
231 javaChannel().disconnect();
232 }
233
234 @Override
235 protected void doClose() throws Exception {
236 javaChannel().close();
237 }
238
239 @Override
240 protected int doReadMessages(List<Object> buf) throws Exception {
241 DatagramChannel ch = javaChannel();
242 DatagramChannelConfig config = config();
243 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
244 if (allocHandle == null) {
245 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
246 }
247 ByteBuf data = allocHandle.allocate(config.getAllocator());
248 boolean free = true;
249 try {
250 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
251 int pos = nioData.position();
252 InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
253 if (remoteAddress == null) {
254 return 0;
255 }
256
257 int readBytes = nioData.position() - pos;
258 data.writerIndex(data.writerIndex() + readBytes);
259 allocHandle.record(readBytes);
260
261 buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
262 free = false;
263 return 1;
264 } catch (Throwable cause) {
265 PlatformDependent.throwException(cause);
266 return -1;
267 } finally {
268 if (free) {
269 data.release();
270 }
271 }
272 }
273
274 @Override
275 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
276 final SocketAddress remoteAddress;
277 final ByteBuf data;
278 if (msg instanceof AddressedEnvelope) {
279 @SuppressWarnings("unchecked")
280 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
281 remoteAddress = envelope.recipient();
282 data = envelope.content();
283 } else {
284 data = (ByteBuf) msg;
285 remoteAddress = null;
286 }
287
288 final int dataLen = data.readableBytes();
289 if (dataLen == 0) {
290 return true;
291 }
292
293 final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
294 final int writtenBytes;
295 if (remoteAddress != null) {
296 writtenBytes = javaChannel().send(nioData, remoteAddress);
297 } else {
298 writtenBytes = javaChannel().write(nioData);
299 }
300 return writtenBytes > 0;
301 }
302
303 @Override
304 protected Object filterOutboundMessage(Object msg) {
305 if (msg instanceof DatagramPacket) {
306 DatagramPacket p = (DatagramPacket) msg;
307 ByteBuf content = p.content();
308 if (isSingleDirectBuffer(content)) {
309 return p;
310 }
311 return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
312 }
313
314 if (msg instanceof ByteBuf) {
315 ByteBuf buf = (ByteBuf) msg;
316 if (isSingleDirectBuffer(buf)) {
317 return buf;
318 }
319 return newDirectBuffer(buf);
320 }
321
322 if (msg instanceof AddressedEnvelope) {
323 @SuppressWarnings("unchecked")
324 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
325 if (e.content() instanceof ByteBuf) {
326 ByteBuf content = (ByteBuf) e.content();
327 if (isSingleDirectBuffer(content)) {
328 return e;
329 }
330 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
331 }
332 }
333
334 throw new UnsupportedOperationException(
335 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
336 }
337
338
339
340
341
342 private static boolean isSingleDirectBuffer(ByteBuf buf) {
343 return buf.isDirect() && buf.nioBufferCount() == 1;
344 }
345
346 @Override
347 protected boolean continueOnWriteError() {
348
349
350
351 return true;
352 }
353
354 @Override
355 public InetSocketAddress localAddress() {
356 return (InetSocketAddress) super.localAddress();
357 }
358
359 @Override
360 public InetSocketAddress remoteAddress() {
361 return (InetSocketAddress) super.remoteAddress();
362 }
363
364 @Override
365 public ChannelFuture joinGroup(InetAddress multicastAddress) {
366 return joinGroup(multicastAddress, newPromise());
367 }
368
369 @Override
370 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
371 try {
372 return joinGroup(
373 multicastAddress,
374 NetworkInterface.getByInetAddress(localAddress().getAddress()),
375 null, promise);
376 } catch (SocketException e) {
377 promise.setFailure(e);
378 }
379 return promise;
380 }
381
382 @Override
383 public ChannelFuture joinGroup(
384 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
385 return joinGroup(multicastAddress, networkInterface, newPromise());
386 }
387
388 @Override
389 public ChannelFuture joinGroup(
390 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
391 ChannelPromise promise) {
392 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
393 }
394
395 @Override
396 public ChannelFuture joinGroup(
397 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
398 return joinGroup(multicastAddress, networkInterface, source, newPromise());
399 }
400
401 @Override
402 public ChannelFuture joinGroup(
403 InetAddress multicastAddress, NetworkInterface networkInterface,
404 InetAddress source, ChannelPromise promise) {
405
406 checkJavaVersion();
407
408 if (multicastAddress == null) {
409 throw new NullPointerException("multicastAddress");
410 }
411
412 if (networkInterface == null) {
413 throw new NullPointerException("networkInterface");
414 }
415
416 try {
417 MembershipKey key;
418 if (source == null) {
419 key = javaChannel().join(multicastAddress, networkInterface);
420 } else {
421 key = javaChannel().join(multicastAddress, networkInterface, source);
422 }
423
424 synchronized (this) {
425 List<MembershipKey> keys = null;
426 if (memberships == null) {
427 memberships = new HashMap<InetAddress, List<MembershipKey>>();
428 } else {
429 keys = memberships.get(multicastAddress);
430 }
431 if (keys == null) {
432 keys = new ArrayList<MembershipKey>();
433 memberships.put(multicastAddress, keys);
434 }
435 keys.add(key);
436 }
437
438 promise.setSuccess();
439 } catch (Throwable e) {
440 promise.setFailure(e);
441 }
442
443 return promise;
444 }
445
446 @Override
447 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
448 return leaveGroup(multicastAddress, newPromise());
449 }
450
451 @Override
452 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
453 try {
454 return leaveGroup(
455 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
456 } catch (SocketException e) {
457 promise.setFailure(e);
458 }
459 return promise;
460 }
461
462 @Override
463 public ChannelFuture leaveGroup(
464 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
465 return leaveGroup(multicastAddress, networkInterface, newPromise());
466 }
467
468 @Override
469 public ChannelFuture leaveGroup(
470 InetSocketAddress multicastAddress,
471 NetworkInterface networkInterface, ChannelPromise promise) {
472 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
473 }
474
475 @Override
476 public ChannelFuture leaveGroup(
477 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
478 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
479 }
480
481 @Override
482 public ChannelFuture leaveGroup(
483 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
484 ChannelPromise promise) {
485 checkJavaVersion();
486
487 if (multicastAddress == null) {
488 throw new NullPointerException("multicastAddress");
489 }
490 if (networkInterface == null) {
491 throw new NullPointerException("networkInterface");
492 }
493
494 synchronized (this) {
495 if (memberships != null) {
496 List<MembershipKey> keys = memberships.get(multicastAddress);
497 if (keys != null) {
498 Iterator<MembershipKey> keyIt = keys.iterator();
499
500 while (keyIt.hasNext()) {
501 MembershipKey key = keyIt.next();
502 if (networkInterface.equals(key.networkInterface())) {
503 if (source == null && key.sourceAddress() == null ||
504 source != null && source.equals(key.sourceAddress())) {
505 key.drop();
506 keyIt.remove();
507 }
508 }
509 }
510 if (keys.isEmpty()) {
511 memberships.remove(multicastAddress);
512 }
513 }
514 }
515 }
516
517 promise.setSuccess();
518 return promise;
519 }
520
521
522
523
524 @Override
525 public ChannelFuture block(
526 InetAddress multicastAddress, NetworkInterface networkInterface,
527 InetAddress sourceToBlock) {
528 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
529 }
530
531
532
533
534 @Override
535 public ChannelFuture block(
536 InetAddress multicastAddress, NetworkInterface networkInterface,
537 InetAddress sourceToBlock, ChannelPromise promise) {
538 checkJavaVersion();
539
540 if (multicastAddress == null) {
541 throw new NullPointerException("multicastAddress");
542 }
543 if (sourceToBlock == null) {
544 throw new NullPointerException("sourceToBlock");
545 }
546
547 if (networkInterface == null) {
548 throw new NullPointerException("networkInterface");
549 }
550 synchronized (this) {
551 if (memberships != null) {
552 List<MembershipKey> keys = memberships.get(multicastAddress);
553 for (MembershipKey key: keys) {
554 if (networkInterface.equals(key.networkInterface())) {
555 try {
556 key.block(sourceToBlock);
557 } catch (IOException e) {
558 promise.setFailure(e);
559 }
560 }
561 }
562 }
563 }
564 promise.setSuccess();
565 return promise;
566 }
567
568
569
570
571
572 @Override
573 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
574 return block(multicastAddress, sourceToBlock, newPromise());
575 }
576
577
578
579
580
581 @Override
582 public ChannelFuture block(
583 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
584 try {
585 return block(
586 multicastAddress,
587 NetworkInterface.getByInetAddress(localAddress().getAddress()),
588 sourceToBlock, promise);
589 } catch (SocketException e) {
590 promise.setFailure(e);
591 }
592 return promise;
593 }
594
595 @Override
596 protected void setReadPending(boolean readPending) {
597 super.setReadPending(readPending);
598 }
599
600 @Override
601 protected boolean closeOnReadError(Throwable cause) {
602
603
604 if (cause instanceof SocketException) {
605 return false;
606 }
607 return super.closeOnReadError(cause);
608 }
609 }