1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelException;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.RecvByteBufAllocator;
28 import io.netty.channel.oio.AbstractOioMessageChannel;
29 import io.netty.channel.socket.DatagramChannel;
30 import io.netty.channel.socket.DatagramChannelConfig;
31 import io.netty.channel.socket.DatagramPacket;
32 import io.netty.util.internal.EmptyArrays;
33 import io.netty.util.internal.PlatformDependent;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.net.InetAddress;
40 import java.net.InetSocketAddress;
41 import java.net.MulticastSocket;
42 import java.net.NetworkInterface;
43 import java.net.SocketAddress;
44 import java.net.SocketException;
45 import java.net.SocketTimeoutException;
46 import java.nio.channels.NotYetConnectedException;
47 import java.util.List;
48 import java.util.Locale;
49
50
51
52
53
54
55
56
57 public class OioDatagramChannel extends AbstractOioMessageChannel
58 implements DatagramChannel {
59
60 private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
61
62 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
63 private static final String EXPECTED_TYPES =
64 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
65 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
66 StringUtil.simpleClassName(ByteBuf.class) + ", " +
67 StringUtil.simpleClassName(SocketAddress.class) + ">, " +
68 StringUtil.simpleClassName(ByteBuf.class) + ')';
69
70 private final MulticastSocket socket;
71 private final OioDatagramChannelConfig config;
72 private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
73
74 private RecvByteBufAllocator.Handle allocHandle;
75
76 private static MulticastSocket newSocket() {
77 try {
78 return new MulticastSocket(null);
79 } catch (Exception e) {
80 throw new ChannelException("failed to create a new socket", e);
81 }
82 }
83
84
85
86
87 public OioDatagramChannel() {
88 this(newSocket());
89 }
90
91
92
93
94
95
96 public OioDatagramChannel(MulticastSocket socket) {
97 super(null);
98
99 boolean success = false;
100 try {
101 socket.setSoTimeout(SO_TIMEOUT);
102 socket.setBroadcast(false);
103 success = true;
104 } catch (SocketException e) {
105 throw new ChannelException(
106 "Failed to configure the datagram socket timeout.", e);
107 } finally {
108 if (!success) {
109 socket.close();
110 }
111 }
112
113 this.socket = socket;
114 config = new DefaultOioDatagramChannelConfig(this, socket);
115 }
116
117 @Override
118 public ChannelMetadata metadata() {
119 return METADATA;
120 }
121
122
123
124
125
126
127 @Override
128
129 public DatagramChannelConfig config() {
130 return config;
131 }
132
133 @Override
134 public boolean isOpen() {
135 return !socket.isClosed();
136 }
137
138 @Override
139 @SuppressWarnings("deprecation")
140 public boolean isActive() {
141 return isOpen()
142 && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
143 || socket.isBound());
144 }
145
146 @Override
147 public boolean isConnected() {
148 return socket.isConnected();
149 }
150
151 @Override
152 protected SocketAddress localAddress0() {
153 return socket.getLocalSocketAddress();
154 }
155
156 @Override
157 protected SocketAddress remoteAddress0() {
158 return socket.getRemoteSocketAddress();
159 }
160
161 @Override
162 protected void doBind(SocketAddress localAddress) throws Exception {
163 socket.bind(localAddress);
164 }
165
166 @Override
167 public InetSocketAddress localAddress() {
168 return (InetSocketAddress) super.localAddress();
169 }
170
171 @Override
172 public InetSocketAddress remoteAddress() {
173 return (InetSocketAddress) super.remoteAddress();
174 }
175
176 @Override
177 protected void doConnect(SocketAddress remoteAddress,
178 SocketAddress localAddress) throws Exception {
179 if (localAddress != null) {
180 socket.bind(localAddress);
181 }
182
183 boolean success = false;
184 try {
185 socket.connect(remoteAddress);
186 success = true;
187 } finally {
188 if (!success) {
189 try {
190 socket.close();
191 } catch (Throwable t) {
192 logger.warn("Failed to close a socket.", t);
193 }
194 }
195 }
196 }
197
198 @Override
199 protected void doDisconnect() throws Exception {
200 socket.disconnect();
201 }
202
203 @Override
204 protected void doClose() throws Exception {
205 socket.close();
206 }
207
208 @Override
209 protected int doReadMessages(List<Object> buf) throws Exception {
210 DatagramChannelConfig config = config();
211 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
212 if (allocHandle == null) {
213 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
214 }
215
216 ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
217 boolean free = true;
218 try {
219
220 tmpPacket.setAddress(null);
221 tmpPacket.setData(data.array(), data.arrayOffset(), data.capacity());
222 socket.receive(tmpPacket);
223
224 InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
225
226 int readBytes = tmpPacket.getLength();
227 allocHandle.record(readBytes);
228 buf.add(new DatagramPacket(data.writerIndex(readBytes), localAddress(), remoteAddr));
229 free = false;
230 return 1;
231 } catch (SocketTimeoutException e) {
232
233 return 0;
234 } catch (SocketException e) {
235 if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) {
236 throw e;
237 }
238 return -1;
239 } catch (Throwable cause) {
240 PlatformDependent.throwException(cause);
241 return -1;
242 } finally {
243 if (free) {
244 data.release();
245 }
246 }
247 }
248
249 @Override
250 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
251 for (;;) {
252 final Object o = in.current();
253 if (o == null) {
254 break;
255 }
256
257 final ByteBuf data;
258 final SocketAddress remoteAddress;
259 if (o instanceof AddressedEnvelope) {
260 @SuppressWarnings("unchecked")
261 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
262 remoteAddress = envelope.recipient();
263 data = envelope.content();
264 } else {
265 data = (ByteBuf) o;
266 remoteAddress = null;
267 }
268
269 final int length = data.readableBytes();
270 try {
271 if (remoteAddress != null) {
272 tmpPacket.setSocketAddress(remoteAddress);
273 } else {
274 if (!isConnected()) {
275
276
277 throw new NotYetConnectedException();
278 }
279
280 tmpPacket.setAddress(null);
281 }
282 if (data.hasArray()) {
283 tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
284 } else {
285 byte[] tmp = new byte[length];
286 data.getBytes(data.readerIndex(), tmp);
287 tmpPacket.setData(tmp);
288 }
289 socket.send(tmpPacket);
290 in.remove();
291 } catch (Exception e) {
292
293
294
295 in.remove(e);
296 }
297 }
298 }
299
300 @Override
301 protected Object filterOutboundMessage(Object msg) {
302 if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
303 return msg;
304 }
305
306 if (msg instanceof AddressedEnvelope) {
307 @SuppressWarnings("unchecked")
308 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
309 if (e.content() instanceof ByteBuf) {
310 return msg;
311 }
312 }
313
314 throw new UnsupportedOperationException(
315 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
316 }
317
318 @Override
319 public ChannelFuture joinGroup(InetAddress multicastAddress) {
320 return joinGroup(multicastAddress, newPromise());
321 }
322
323 @Override
324 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
325 ensureBound();
326 try {
327 socket.joinGroup(multicastAddress);
328 promise.setSuccess();
329 } catch (IOException e) {
330 promise.setFailure(e);
331 }
332 return promise;
333 }
334
335 @Override
336 public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
337 return joinGroup(multicastAddress, networkInterface, newPromise());
338 }
339
340 @Override
341 public ChannelFuture joinGroup(
342 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
343 ChannelPromise promise) {
344 ensureBound();
345 try {
346 socket.joinGroup(multicastAddress, networkInterface);
347 promise.setSuccess();
348 } catch (IOException e) {
349 promise.setFailure(e);
350 }
351 return promise;
352 }
353
354 @Override
355 public ChannelFuture joinGroup(
356 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
357 return newFailedFuture(new UnsupportedOperationException());
358 }
359
360 @Override
361 public ChannelFuture joinGroup(
362 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
363 ChannelPromise promise) {
364 promise.setFailure(new UnsupportedOperationException());
365 return promise;
366 }
367
368 private void ensureBound() {
369 if (!isActive()) {
370 throw new IllegalStateException(
371 DatagramChannel.class.getName() +
372 " must be bound to join a group.");
373 }
374 }
375
376 @Override
377 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
378 return leaveGroup(multicastAddress, newPromise());
379 }
380
381 @Override
382 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
383 try {
384 socket.leaveGroup(multicastAddress);
385 promise.setSuccess();
386 } catch (IOException e) {
387 promise.setFailure(e);
388 }
389 return promise;
390 }
391
392 @Override
393 public ChannelFuture leaveGroup(
394 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
395 return leaveGroup(multicastAddress, networkInterface, newPromise());
396 }
397
398 @Override
399 public ChannelFuture leaveGroup(
400 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
401 ChannelPromise promise) {
402 try {
403 socket.leaveGroup(multicastAddress, networkInterface);
404 promise.setSuccess();
405 } catch (IOException e) {
406 promise.setFailure(e);
407 }
408 return promise;
409 }
410
411 @Override
412 public ChannelFuture leaveGroup(
413 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
414 return newFailedFuture(new UnsupportedOperationException());
415 }
416
417 @Override
418 public ChannelFuture leaveGroup(
419 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
420 ChannelPromise promise) {
421 promise.setFailure(new UnsupportedOperationException());
422 return promise;
423 }
424
425 @Override
426 public ChannelFuture block(InetAddress multicastAddress,
427 NetworkInterface networkInterface, InetAddress sourceToBlock) {
428 return newFailedFuture(new UnsupportedOperationException());
429 }
430
431 @Override
432 public ChannelFuture block(InetAddress multicastAddress,
433 NetworkInterface networkInterface, InetAddress sourceToBlock,
434 ChannelPromise promise) {
435 promise.setFailure(new UnsupportedOperationException());
436 return promise;
437 }
438
439 @Override
440 public ChannelFuture block(InetAddress multicastAddress,
441 InetAddress sourceToBlock) {
442 return newFailedFuture(new UnsupportedOperationException());
443 }
444
445 @Override
446 public ChannelFuture block(InetAddress multicastAddress,
447 InetAddress sourceToBlock, ChannelPromise promise) {
448 promise.setFailure(new UnsupportedOperationException());
449 return promise;
450 }
451 }