1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.net.NetworkInterface;
24 import java.net.SocketAddress;
25 import java.net.SocketException;
26 import java.nio.channels.DatagramChannel;
27 import java.nio.channels.MembershipKey;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33
34 import org.jboss.netty.channel.ChannelException;
35 import org.jboss.netty.channel.ChannelFactory;
36 import org.jboss.netty.channel.ChannelFuture;
37 import org.jboss.netty.channel.ChannelPipeline;
38 import org.jboss.netty.channel.ChannelSink;
39 import org.jboss.netty.channel.socket.DatagramChannelConfig;
40 import org.jboss.netty.channel.socket.InternetProtocolFamily;
41 import org.jboss.netty.util.internal.DetectionUtil;
42
43
44
45
46 public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
47 implements org.jboss.netty.channel.socket.DatagramChannel {
48
49
50
51
52 private final NioDatagramChannelConfig config;
53 private Map<InetAddress, List<MembershipKey>> memberships;
54
55 NioDatagramChannel(final ChannelFactory factory,
56 final ChannelPipeline pipeline, final ChannelSink sink,
57 final NioDatagramWorker worker, InternetProtocolFamily family) {
58 super(null, factory, pipeline, sink, worker, openNonBlockingChannel(family));
59 config = new DefaultNioDatagramChannelConfig(channel);
60
61 fireChannelOpen(this);
62
63 }
64
65 private static DatagramChannel openNonBlockingChannel(InternetProtocolFamily family) {
66 try {
67 final DatagramChannel channel;
68
69
70 if (DetectionUtil.javaVersion() < 7 || family == null) {
71 channel = DatagramChannel.open();
72 } else {
73
74
75
76
77
78 switch (family) {
79 case IPv4:
80 channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
81 break;
82
83 case IPv6:
84 channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
85 break;
86
87 default:
88 throw new IllegalArgumentException();
89 }
90 }
91
92 channel.configureBlocking(false);
93 return channel;
94 } catch (final IOException e) {
95 throw new ChannelException("Failed to open a DatagramChannel.", e);
96 }
97 }
98
99
100
101 @Override
102 public NioDatagramWorker getWorker() {
103 return (NioDatagramWorker) super.getWorker();
104 }
105
106 public boolean isBound() {
107 return isOpen() && channel.socket().isBound();
108 }
109
110 public boolean isConnected() {
111 return channel.isConnected();
112 }
113
114 @Override
115 protected boolean setClosed() {
116 return super.setClosed();
117 }
118
119 @Override
120 public NioDatagramChannelConfig getConfig() {
121 return config;
122 }
123
124 DatagramChannel getDatagramChannel() {
125 return channel;
126 }
127
128
129
130 public ChannelFuture joinGroup(InetAddress multicastAddress) {
131 try {
132 return joinGroup(
133 multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
134 } catch (SocketException e) {
135 return failedFuture(this, e);
136 }
137 }
138
139
140 public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
141 return joinGroup(multicastAddress.getAddress(), networkInterface, null);
142 }
143
144
145
146
147 public ChannelFuture joinGroup(
148 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
149 if (DetectionUtil.javaVersion() < 7) {
150 throw new UnsupportedOperationException();
151 } else {
152 if (multicastAddress == null) {
153 throw new NullPointerException("multicastAddress");
154 }
155
156 if (networkInterface == null) {
157 throw new NullPointerException("networkInterface");
158 }
159
160 try {
161 MembershipKey key;
162 if (source == null) {
163 key = channel.join(multicastAddress, networkInterface);
164 } else {
165 key = channel.join(multicastAddress, networkInterface, source);
166 }
167
168 synchronized (this) {
169 if (memberships == null) {
170 memberships = new HashMap<InetAddress, List<MembershipKey>>();
171
172 }
173 List<MembershipKey> keys = memberships.get(multicastAddress);
174 if (keys == null) {
175 keys = new ArrayList<MembershipKey>();
176 memberships.put(multicastAddress, keys);
177 }
178 keys.add(key);
179 }
180 } catch (Throwable e) {
181 return failedFuture(this, e);
182 }
183 }
184 return succeededFuture(this);
185 }
186
187 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
188 try {
189 return leaveGroup(
190 multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
191 } catch (SocketException e) {
192 return failedFuture(this, e);
193 }
194
195 }
196
197 public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
198 NetworkInterface networkInterface) {
199 return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
200 }
201
202
203
204
205 public ChannelFuture leaveGroup(InetAddress multicastAddress,
206 NetworkInterface networkInterface, InetAddress source) {
207 if (DetectionUtil.javaVersion() < 7) {
208 throw new UnsupportedOperationException();
209 } else {
210 if (multicastAddress == null) {
211 throw new NullPointerException("multicastAddress");
212 }
213
214 if (networkInterface == null) {
215 throw new NullPointerException("networkInterface");
216 }
217
218 synchronized (this) {
219 if (memberships != null) {
220 List<MembershipKey> keys = memberships.get(multicastAddress);
221 if (keys != null) {
222 Iterator<MembershipKey> keyIt = keys.iterator();
223
224 while (keyIt.hasNext()) {
225 MembershipKey key = keyIt.next();
226 if (networkInterface.equals(key.networkInterface())) {
227 if (source == null && key.sourceAddress() == null ||
228 source != null && source.equals(key.sourceAddress())) {
229 key.drop();
230 keyIt.remove();
231 }
232
233 }
234 }
235 if (keys.isEmpty()) {
236 memberships.remove(multicastAddress);
237 }
238 }
239 }
240 }
241 return succeededFuture(this);
242 }
243 }
244
245
246
247
248
249 public ChannelFuture block(InetAddress multicastAddress,
250 NetworkInterface networkInterface, InetAddress sourceToBlock) {
251 if (DetectionUtil.javaVersion() < 7) {
252 throw new UnsupportedOperationException();
253 } else {
254 if (multicastAddress == null) {
255 throw new NullPointerException("multicastAddress");
256 }
257 if (sourceToBlock == null) {
258 throw new NullPointerException("sourceToBlock");
259 }
260
261 if (networkInterface == null) {
262 throw new NullPointerException("networkInterface");
263 }
264 synchronized (this) {
265 if (memberships != null) {
266 List<MembershipKey> keys = memberships.get(multicastAddress);
267 for (MembershipKey key: keys) {
268 if (networkInterface.equals(key.networkInterface())) {
269 try {
270 key.block(sourceToBlock);
271 } catch (IOException e) {
272 return failedFuture(this, e);
273 }
274 }
275 }
276 }
277 }
278 return succeededFuture(this);
279
280
281 }
282 }
283
284
285
286
287
288 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
289 try {
290 block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
291 } catch (SocketException e) {
292 return failedFuture(this, e);
293 }
294 return succeededFuture(this);
295
296 }
297 @Override
298 InetSocketAddress getLocalSocketAddress() throws Exception {
299 return (InetSocketAddress) channel.socket().getLocalSocketAddress();
300 }
301
302 @Override
303 InetSocketAddress getRemoteSocketAddress() throws Exception {
304 return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
305 }
306
307 @Override
308 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
309 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
310 return super.write(message, null);
311 } else {
312 return super.write(message, remoteAddress);
313 }
314
315 }
316 }