1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelException;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOption;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.oio.AbstractOioMessageChannel;
30 import io.netty.channel.socket.DatagramChannel;
31 import io.netty.channel.socket.DatagramChannelConfig;
32 import io.netty.channel.socket.DatagramPacket;
33 import io.netty.util.internal.EmptyArrays;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.logging.InternalLogger;
37 import io.netty.util.internal.logging.InternalLoggerFactory;
38
39 import java.io.IOException;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.MulticastSocket;
43 import java.net.NetworkInterface;
44 import java.net.SocketAddress;
45 import java.net.SocketException;
46 import java.net.SocketTimeoutException;
47 import java.nio.channels.NotYetConnectedException;
48 import java.util.List;
49 import java.util.Locale;
50
51
52
53
54
55
56
57
58
59 @Deprecated
60 public class OioDatagramChannel extends AbstractOioMessageChannel
61 implements DatagramChannel {
62
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
64
65 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
66 private static final String EXPECTED_TYPES =
67 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
68 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
69 StringUtil.simpleClassName(ByteBuf.class) + ", " +
70 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
71 StringUtil.simpleClassName(ByteBuf.class) + ')';
72
73 private final MulticastSocket socket;
74 private final OioDatagramChannelConfig config;
75 private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
76
77 private static MulticastSocket newSocket() {
78 try {
79 return new MulticastSocket(null);
80 } catch (Exception e) {
81 throw new ChannelException("failed to create a new socket", e);
82 }
83 }
84
85
86
87
88 public OioDatagramChannel() {
89 this(newSocket());
90 }
91
92
93
94
95
96
97 public OioDatagramChannel(MulticastSocket socket) {
98 super(null);
99
100 boolean success = false;
101 try {
102 socket.setSoTimeout(SO_TIMEOUT);
103 socket.setBroadcast(false);
104 success = true;
105 } catch (SocketException e) {
106 throw new ChannelException(
107 "Failed to configure the datagram socket timeout.", e);
108 } finally {
109 if (!success) {
110 socket.close();
111 }
112 }
113
114 this.socket = socket;
115 config = new DefaultOioDatagramChannelConfig(this, socket);
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123
124
125
126
127
128 @Override
129
130 public DatagramChannelConfig config() {
131 return config;
132 }
133
134 @Override
135 public boolean isOpen() {
136 return !socket.isClosed();
137 }
138
139 @Override
140 @SuppressWarnings("deprecation")
141 public boolean isActive() {
142 return isOpen()
143 && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
144 || socket.isBound());
145 }
146
147 @Override
148 public boolean isConnected() {
149 return socket.isConnected();
150 }
151
152 @Override
153 protected SocketAddress localAddress0() {
154 return socket.getLocalSocketAddress();
155 }
156
157 @Override
158 protected SocketAddress remoteAddress0() {
159 return socket.getRemoteSocketAddress();
160 }
161
162 @Override
163 protected void doBind(SocketAddress localAddress) throws Exception {
164 socket.bind(localAddress);
165 }
166
167 @Override
168 public InetSocketAddress localAddress() {
169 return (InetSocketAddress) super.localAddress();
170 }
171
172 @Override
173 public InetSocketAddress remoteAddress() {
174 return (InetSocketAddress) super.remoteAddress();
175 }
176
177 @Override
178 protected void doConnect(SocketAddress remoteAddress,
179 SocketAddress localAddress) throws Exception {
180 if (localAddress != null) {
181 socket.bind(localAddress);
182 }
183
184 boolean success = false;
185 try {
186 socket.connect(remoteAddress);
187 success = true;
188 } finally {
189 if (!success) {
190 try {
191 socket.close();
192 } catch (Throwable t) {
193 logger.warn("Failed to close a socket.", t);
194 }
195 }
196 }
197 }
198
199 @Override
200 protected void doDisconnect() throws Exception {
201 socket.disconnect();
202 }
203
204 @Override
205 protected void doClose() throws Exception {
206 socket.close();
207 }
208
209 @Override
210 protected int doReadMessages(List<Object> buf) throws Exception {
211 DatagramChannelConfig config = config();
212 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
213
214 ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
215 boolean free = true;
216 try {
217
218 tmpPacket.setAddress(null);
219 tmpPacket.setData(data.array(), data.arrayOffset(), data.capacity());
220 socket.receive(tmpPacket);
221
222 InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
223
224 allocHandle.lastBytesRead(tmpPacket.getLength());
225 buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr));
226 free = false;
227 return 1;
228 } catch (SocketTimeoutException e) {
229
230 return 0;
231 } catch (SocketException e) {
232 if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) {
233 throw e;
234 }
235 return -1;
236 } catch (Throwable cause) {
237 PlatformDependent.throwException(cause);
238 return -1;
239 } finally {
240 if (free) {
241 data.release();
242 }
243 }
244 }
245
246 @Override
247 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
248 for (;;) {
249 final Object o = in.current();
250 if (o == null) {
251 break;
252 }
253
254 final ByteBuf data;
255 final SocketAddress remoteAddress;
256 if (o instanceof AddressedEnvelope) {
257 @SuppressWarnings("unchecked")
258 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
259 remoteAddress = envelope.recipient();
260 data = envelope.content();
261 } else {
262 data = (ByteBuf) o;
263 remoteAddress = null;
264 }
265
266 final int length = data.readableBytes();
267 try {
268 if (remoteAddress != null) {
269 tmpPacket.setSocketAddress(remoteAddress);
270 } else {
271 if (!isConnected()) {
272
273
274 throw new NotYetConnectedException();
275 }
276
277 tmpPacket.setAddress(null);
278 }
279 if (data.hasArray()) {
280 tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
281 } else {
282 tmpPacket.setData(ByteBufUtil.getBytes(data, data.readerIndex(), length));
283 }
284 socket.send(tmpPacket);
285 in.remove();
286 } catch (Exception e) {
287
288
289
290 in.remove(e);
291 }
292 }
293 }
294
295 @Override
296 protected Object filterOutboundMessage(Object msg) {
297 if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
298 return msg;
299 }
300
301 if (msg instanceof AddressedEnvelope) {
302 @SuppressWarnings("unchecked")
303 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
304 if (e.content() instanceof ByteBuf) {
305 return msg;
306 }
307 }
308
309 throw new UnsupportedOperationException(
310 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
311 }
312
313 @Override
314 public ChannelFuture joinGroup(InetAddress multicastAddress) {
315 return joinGroup(multicastAddress, newPromise());
316 }
317
318 @Override
319 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
320 ensureBound();
321 try {
322 socket.joinGroup(multicastAddress);
323 promise.setSuccess();
324 } catch (IOException e) {
325 promise.setFailure(e);
326 }
327 return promise;
328 }
329
330 @Override
331 public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
332 return joinGroup(multicastAddress, networkInterface, newPromise());
333 }
334
335 @Override
336 public ChannelFuture joinGroup(
337 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
338 ChannelPromise promise) {
339 ensureBound();
340 try {
341 socket.joinGroup(multicastAddress, networkInterface);
342 promise.setSuccess();
343 } catch (IOException e) {
344 promise.setFailure(e);
345 }
346 return promise;
347 }
348
349 @Override
350 public ChannelFuture joinGroup(
351 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
352 return newFailedFuture(new UnsupportedOperationException());
353 }
354
355 @Override
356 public ChannelFuture joinGroup(
357 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
358 ChannelPromise promise) {
359 promise.setFailure(new UnsupportedOperationException());
360 return promise;
361 }
362
363 private void ensureBound() {
364 if (!isActive()) {
365 throw new IllegalStateException(
366 DatagramChannel.class.getName() +
367 " must be bound to join a group.");
368 }
369 }
370
371 @Override
372 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
373 return leaveGroup(multicastAddress, newPromise());
374 }
375
376 @Override
377 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
378 try {
379 socket.leaveGroup(multicastAddress);
380 promise.setSuccess();
381 } catch (IOException e) {
382 promise.setFailure(e);
383 }
384 return promise;
385 }
386
387 @Override
388 public ChannelFuture leaveGroup(
389 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
390 return leaveGroup(multicastAddress, networkInterface, newPromise());
391 }
392
393 @Override
394 public ChannelFuture leaveGroup(
395 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
396 ChannelPromise promise) {
397 try {
398 socket.leaveGroup(multicastAddress, networkInterface);
399 promise.setSuccess();
400 } catch (IOException e) {
401 promise.setFailure(e);
402 }
403 return promise;
404 }
405
406 @Override
407 public ChannelFuture leaveGroup(
408 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
409 return newFailedFuture(new UnsupportedOperationException());
410 }
411
412 @Override
413 public ChannelFuture leaveGroup(
414 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
415 ChannelPromise promise) {
416 promise.setFailure(new UnsupportedOperationException());
417 return promise;
418 }
419
420 @Override
421 public ChannelFuture block(InetAddress multicastAddress,
422 NetworkInterface networkInterface, InetAddress sourceToBlock) {
423 return newFailedFuture(new UnsupportedOperationException());
424 }
425
426 @Override
427 public ChannelFuture block(InetAddress multicastAddress,
428 NetworkInterface networkInterface, InetAddress sourceToBlock,
429 ChannelPromise promise) {
430 promise.setFailure(new UnsupportedOperationException());
431 return promise;
432 }
433
434 @Override
435 public ChannelFuture block(InetAddress multicastAddress,
436 InetAddress sourceToBlock) {
437 return newFailedFuture(new UnsupportedOperationException());
438 }
439
440 @Override
441 public ChannelFuture block(InetAddress multicastAddress,
442 InetAddress sourceToBlock, ChannelPromise promise) {
443 promise.setFailure(new UnsupportedOperationException());
444 return promise;
445 }
446 }