1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelException;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOption;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.oio.AbstractOioMessageChannel;
30 import io.netty.channel.socket.DatagramChannel;
31 import io.netty.channel.socket.DatagramChannelConfig;
32 import io.netty.channel.socket.DatagramPacket;
33 import io.netty.util.internal.EmptyArrays;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.logging.InternalLogger;
37 import io.netty.util.internal.logging.InternalLoggerFactory;
38
39 import java.io.IOException;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.MulticastSocket;
43 import java.net.NetworkInterface;
44 import java.net.SocketAddress;
45 import java.net.SocketException;
46 import java.net.SocketTimeoutException;
47 import java.nio.channels.NotYetConnectedException;
48 import java.nio.channels.UnresolvedAddressException;
49 import java.util.List;
50 import java.util.Locale;
51
52
53
54
55
56
57
58
59
60 @Deprecated
61 public class OioDatagramChannel extends AbstractOioMessageChannel
62 implements DatagramChannel {
63
64 private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
65
66 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
67 private static final String EXPECTED_TYPES =
68 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
69 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
70 StringUtil.simpleClassName(ByteBuf.class) + ", " +
71 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
72 StringUtil.simpleClassName(ByteBuf.class) + ')';
73
74 private final MulticastSocket socket;
75 private final OioDatagramChannelConfig config;
76 private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
77
78 private static MulticastSocket newSocket() {
79 try {
80 return new MulticastSocket(null);
81 } catch (Exception e) {
82 throw new ChannelException("failed to create a new socket", e);
83 }
84 }
85
86
87
88
89 public OioDatagramChannel() {
90 this(newSocket());
91 }
92
93
94
95
96
97
98 public OioDatagramChannel(MulticastSocket socket) {
99 super(null);
100
101 boolean success = false;
102 try {
103 socket.setSoTimeout(SO_TIMEOUT);
104 socket.setBroadcast(false);
105 success = true;
106 } catch (SocketException e) {
107 throw new ChannelException(
108 "Failed to configure the datagram socket timeout.", e);
109 } finally {
110 if (!success) {
111 socket.close();
112 }
113 }
114
115 this.socket = socket;
116 config = new DefaultOioDatagramChannelConfig(this, socket);
117 }
118
119 @Override
120 public ChannelMetadata metadata() {
121 return METADATA;
122 }
123
124
125
126
127
128
129 @Override
130
131 public DatagramChannelConfig config() {
132 return config;
133 }
134
135 @Override
136 public boolean isOpen() {
137 return !socket.isClosed();
138 }
139
140 @Override
141 @SuppressWarnings("deprecation")
142 public boolean isActive() {
143 return isOpen()
144 && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
145 || socket.isBound());
146 }
147
148 @Override
149 public boolean isConnected() {
150 return socket.isConnected();
151 }
152
153 @Override
154 protected SocketAddress localAddress0() {
155 return socket.getLocalSocketAddress();
156 }
157
158 @Override
159 protected SocketAddress remoteAddress0() {
160 return socket.getRemoteSocketAddress();
161 }
162
163 @Override
164 protected void doBind(SocketAddress localAddress) throws Exception {
165 socket.bind(localAddress);
166 }
167
168 @Override
169 public InetSocketAddress localAddress() {
170 return (InetSocketAddress) super.localAddress();
171 }
172
173 @Override
174 public InetSocketAddress remoteAddress() {
175 return (InetSocketAddress) super.remoteAddress();
176 }
177
178 @Override
179 protected void doConnect(SocketAddress remoteAddress,
180 SocketAddress localAddress) throws Exception {
181 if (localAddress != null) {
182 socket.bind(localAddress);
183 }
184
185 boolean success = false;
186 try {
187 socket.connect(remoteAddress);
188 success = true;
189 } finally {
190 if (!success) {
191 try {
192 socket.close();
193 } catch (Throwable t) {
194 logger.warn("Failed to close a socket.", t);
195 }
196 }
197 }
198 }
199
200 @Override
201 protected void doDisconnect() throws Exception {
202 socket.disconnect();
203 }
204
205 @Override
206 protected void doClose() throws Exception {
207 socket.close();
208 }
209
210 @Override
211 protected int doReadMessages(List<Object> buf) throws Exception {
212 DatagramChannelConfig config = config();
213 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
214
215 ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
216 boolean free = true;
217 try {
218
219 tmpPacket.setAddress(null);
220 tmpPacket.setData(data.array(), data.arrayOffset(), data.capacity());
221 socket.receive(tmpPacket);
222
223 InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
224
225 allocHandle.lastBytesRead(tmpPacket.getLength());
226 buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr));
227 free = false;
228 return 1;
229 } catch (SocketTimeoutException e) {
230
231 return 0;
232 } catch (SocketException e) {
233 if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) {
234 throw e;
235 }
236 return -1;
237 } catch (Throwable cause) {
238 PlatformDependent.throwException(cause);
239 return -1;
240 } finally {
241 if (free) {
242 data.release();
243 }
244 }
245 }
246
247 @Override
248 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
249 for (;;) {
250 final Object o = in.current();
251 if (o == null) {
252 break;
253 }
254
255 final ByteBuf data;
256 final SocketAddress remoteAddress;
257 if (o instanceof AddressedEnvelope) {
258 @SuppressWarnings("unchecked")
259 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
260 remoteAddress = envelope.recipient();
261 data = envelope.content();
262 } else {
263 data = (ByteBuf) o;
264 remoteAddress = null;
265 }
266
267 final int length = data.readableBytes();
268 try {
269 if (remoteAddress != null) {
270 tmpPacket.setSocketAddress(remoteAddress);
271 } else {
272 if (!isConnected()) {
273
274
275 throw new NotYetConnectedException();
276 }
277
278 tmpPacket.setAddress(null);
279 }
280 if (data.hasArray()) {
281 tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
282 } else {
283 tmpPacket.setData(ByteBufUtil.getBytes(data, data.readerIndex(), length));
284 }
285 socket.send(tmpPacket);
286 in.remove();
287 } catch (Exception e) {
288
289
290
291 in.remove(e);
292 }
293 }
294 }
295
296 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
297 if (envelope.recipient() instanceof InetSocketAddress
298 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
299 throw new UnresolvedAddressException();
300 }
301 }
302
303 @Override
304 protected Object filterOutboundMessage(Object msg) {
305 if (msg instanceof DatagramPacket) {
306 checkUnresolved((DatagramPacket) msg);
307 return msg;
308 }
309
310 if (msg instanceof ByteBuf) {
311 return msg;
312 }
313
314 if (msg instanceof AddressedEnvelope) {
315 @SuppressWarnings("unchecked")
316 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
317 checkUnresolved(e);
318 if (e.content() instanceof ByteBuf) {
319 return msg;
320 }
321 }
322
323 throw new UnsupportedOperationException(
324 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
325 }
326
327 @Override
328 public ChannelFuture joinGroup(InetAddress multicastAddress) {
329 return joinGroup(multicastAddress, newPromise());
330 }
331
332 @Override
333 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
334 ensureBound();
335 try {
336 socket.joinGroup(multicastAddress);
337 promise.setSuccess();
338 } catch (IOException e) {
339 promise.setFailure(e);
340 }
341 return promise;
342 }
343
344 @Override
345 public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
346 return joinGroup(multicastAddress, networkInterface, newPromise());
347 }
348
349 @Override
350 public ChannelFuture joinGroup(
351 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
352 ChannelPromise promise) {
353 ensureBound();
354 try {
355 socket.joinGroup(multicastAddress, networkInterface);
356 promise.setSuccess();
357 } catch (IOException e) {
358 promise.setFailure(e);
359 }
360 return promise;
361 }
362
363 @Override
364 public ChannelFuture joinGroup(
365 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
366 return newFailedFuture(new UnsupportedOperationException());
367 }
368
369 @Override
370 public ChannelFuture joinGroup(
371 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
372 ChannelPromise promise) {
373 promise.setFailure(new UnsupportedOperationException());
374 return promise;
375 }
376
377 private void ensureBound() {
378 if (!isActive()) {
379 throw new IllegalStateException(
380 DatagramChannel.class.getName() +
381 " must be bound to join a group.");
382 }
383 }
384
385 @Override
386 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
387 return leaveGroup(multicastAddress, newPromise());
388 }
389
390 @Override
391 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
392 try {
393 socket.leaveGroup(multicastAddress);
394 promise.setSuccess();
395 } catch (IOException e) {
396 promise.setFailure(e);
397 }
398 return promise;
399 }
400
401 @Override
402 public ChannelFuture leaveGroup(
403 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
404 return leaveGroup(multicastAddress, networkInterface, newPromise());
405 }
406
407 @Override
408 public ChannelFuture leaveGroup(
409 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
410 ChannelPromise promise) {
411 try {
412 socket.leaveGroup(multicastAddress, networkInterface);
413 promise.setSuccess();
414 } catch (IOException e) {
415 promise.setFailure(e);
416 }
417 return promise;
418 }
419
420 @Override
421 public ChannelFuture leaveGroup(
422 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
423 return newFailedFuture(new UnsupportedOperationException());
424 }
425
426 @Override
427 public ChannelFuture leaveGroup(
428 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
429 ChannelPromise promise) {
430 promise.setFailure(new UnsupportedOperationException());
431 return promise;
432 }
433
434 @Override
435 public ChannelFuture block(InetAddress multicastAddress,
436 NetworkInterface networkInterface, InetAddress sourceToBlock) {
437 return newFailedFuture(new UnsupportedOperationException());
438 }
439
440 @Override
441 public ChannelFuture block(InetAddress multicastAddress,
442 NetworkInterface networkInterface, InetAddress sourceToBlock,
443 ChannelPromise promise) {
444 promise.setFailure(new UnsupportedOperationException());
445 return promise;
446 }
447
448 @Override
449 public ChannelFuture block(InetAddress multicastAddress,
450 InetAddress sourceToBlock) {
451 return newFailedFuture(new UnsupportedOperationException());
452 }
453
454 @Override
455 public ChannelFuture block(InetAddress multicastAddress,
456 InetAddress sourceToBlock, ChannelPromise promise) {
457 promise.setFailure(new UnsupportedOperationException());
458 return promise;
459 }
460 }