1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.ChannelException;
19 import org.jboss.netty.channel.ChannelFactory;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.channel.ChannelPipeline;
22 import org.jboss.netty.channel.ChannelSink;
23 import org.jboss.netty.channel.socket.DatagramChannelConfig;
24 import org.jboss.netty.channel.socket.InternetProtocolFamily;
25 import org.jboss.netty.util.internal.DetectionUtil;
26
27 import java.io.IOException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.NetworkInterface;
31 import java.net.SocketAddress;
32 import java.net.SocketException;
33 import java.nio.channels.DatagramChannel;
34 import java.nio.channels.MembershipKey;
35 import java.util.ArrayList;
36 import java.util.HashMap;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40
41 import static org.jboss.netty.channel.Channels.*;
42
43
44
45
46 public class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
47 implements org.jboss.netty.channel.socket.DatagramChannel {
48
49
50
51
52 private final NioDatagramChannelConfig config;
53 private Map<InetAddress, List<MembershipKey>> memberships;
54
55 NioDatagramChannel(final ChannelFactory factory,
56 final ChannelPipeline pipeline, final ChannelSink sink,
57 final NioDatagramWorker worker, InternetProtocolFamily family) {
58 super(null, factory, pipeline, sink, worker, openNonBlockingChannel(family));
59 config = new DefaultNioDatagramChannelConfig(channel);
60
61 fireChannelOpen(this);
62 }
63
64 private static DatagramChannel openNonBlockingChannel(InternetProtocolFamily family) {
65 try {
66 final DatagramChannel channel;
67
68
69 if (DetectionUtil.javaVersion() < 7 || family == null) {
70 channel = DatagramChannel.open();
71 } else {
72
73
74
75
76
77 switch (family) {
78 case IPv4:
79 channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
80 break;
81
82 case IPv6:
83 channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
84 break;
85
86 default:
87 throw new IllegalArgumentException();
88 }
89 }
90
91 channel.configureBlocking(false);
92 return channel;
93 } catch (final IOException e) {
94 throw new ChannelException("Failed to open a DatagramChannel.", e);
95 }
96 }
97
98 @Override
99 public NioDatagramWorker getWorker() {
100 return (NioDatagramWorker) super.getWorker();
101 }
102
103 public boolean isBound() {
104 return isOpen() && channel.socket().isBound();
105 }
106
107 public boolean isConnected() {
108 return channel.isConnected();
109 }
110
111 @Override
112 protected boolean setClosed() {
113 return super.setClosed();
114 }
115
116 @Override
117 public NioDatagramChannelConfig getConfig() {
118 return config;
119 }
120
121 DatagramChannel getDatagramChannel() {
122 return channel;
123 }
124
125 public ChannelFuture joinGroup(InetAddress multicastAddress) {
126 try {
127 return joinGroup(
128 multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
129 } catch (SocketException e) {
130 return failedFuture(this, e);
131 }
132 }
133
134 public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
135 return joinGroup(multicastAddress.getAddress(), networkInterface, null);
136 }
137
138
139
140
141 public ChannelFuture joinGroup(
142 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
143 if (DetectionUtil.javaVersion() < 7) {
144 throw new UnsupportedOperationException();
145 }
146
147 if (multicastAddress == null) {
148 throw new NullPointerException("multicastAddress");
149 }
150
151 if (networkInterface == null) {
152 throw new NullPointerException("networkInterface");
153 }
154
155 try {
156 MembershipKey key;
157 if (source == null) {
158 key = channel.join(multicastAddress, networkInterface);
159 } else {
160 key = channel.join(multicastAddress, networkInterface, source);
161 }
162
163 synchronized (this) {
164 if (memberships == null) {
165 memberships = new HashMap<InetAddress, List<MembershipKey>>();
166 }
167 List<MembershipKey> keys = memberships.get(multicastAddress);
168 if (keys == null) {
169 keys = new ArrayList<MembershipKey>();
170 memberships.put(multicastAddress, keys);
171 }
172 keys.add(key);
173 }
174 } catch (Throwable e) {
175 return failedFuture(this, e);
176 }
177 return succeededFuture(this);
178 }
179
180 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
181 try {
182 return leaveGroup(
183 multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
184 } catch (SocketException e) {
185 return failedFuture(this, e);
186 }
187 }
188
189 public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
190 NetworkInterface networkInterface) {
191 return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
192 }
193
194
195
196
197 public ChannelFuture leaveGroup(InetAddress multicastAddress,
198 NetworkInterface networkInterface, InetAddress source) {
199 if (DetectionUtil.javaVersion() < 7) {
200 throw new UnsupportedOperationException();
201 } else {
202 if (multicastAddress == null) {
203 throw new NullPointerException("multicastAddress");
204 }
205
206 if (networkInterface == null) {
207 throw new NullPointerException("networkInterface");
208 }
209
210 synchronized (this) {
211 if (memberships != null) {
212 List<MembershipKey> keys = memberships.get(multicastAddress);
213 if (keys != null) {
214 Iterator<MembershipKey> keyIt = keys.iterator();
215
216 while (keyIt.hasNext()) {
217 MembershipKey key = keyIt.next();
218 if (networkInterface.equals(key.networkInterface())) {
219 if (source == null && key.sourceAddress() == null ||
220 source != null && source.equals(key.sourceAddress())) {
221 key.drop();
222 keyIt.remove();
223 }
224 }
225 }
226 if (keys.isEmpty()) {
227 memberships.remove(multicastAddress);
228 }
229 }
230 }
231 }
232 return succeededFuture(this);
233 }
234 }
235
236
237
238
239
240 public ChannelFuture block(InetAddress multicastAddress,
241 NetworkInterface networkInterface, InetAddress sourceToBlock) {
242 if (DetectionUtil.javaVersion() < 7) {
243 throw new UnsupportedOperationException();
244 } else {
245 if (multicastAddress == null) {
246 throw new NullPointerException("multicastAddress");
247 }
248 if (sourceToBlock == null) {
249 throw new NullPointerException("sourceToBlock");
250 }
251
252 if (networkInterface == null) {
253 throw new NullPointerException("networkInterface");
254 }
255 synchronized (this) {
256 if (memberships != null) {
257 List<MembershipKey> keys = memberships.get(multicastAddress);
258 for (MembershipKey key: keys) {
259 if (networkInterface.equals(key.networkInterface())) {
260 try {
261 key.block(sourceToBlock);
262 } catch (IOException e) {
263 return failedFuture(this, e);
264 }
265 }
266 }
267 }
268 }
269 return succeededFuture(this);
270 }
271 }
272
273
274
275
276
277 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
278 try {
279 block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
280 } catch (SocketException e) {
281 return failedFuture(this, e);
282 }
283 return succeededFuture(this);
284 }
285
286 @Override
287 InetSocketAddress getLocalSocketAddress() throws Exception {
288 return (InetSocketAddress) channel.socket().getLocalSocketAddress();
289 }
290
291 @Override
292 InetSocketAddress getRemoteSocketAddress() throws Exception {
293 return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
294 }
295
296 @Override
297 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
298 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
299 return super.write(message, null);
300 } else {
301 return super.write(message, remoteAddress);
302 }
303 }
304 }