View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelException;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOption;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.oio.AbstractOioMessageChannel;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramChannelConfig;
32  import io.netty.channel.socket.DatagramPacket;
33  import io.netty.util.internal.EmptyArrays;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import java.io.IOException;
40  import java.net.InetAddress;
41  import java.net.InetSocketAddress;
42  import java.net.MulticastSocket;
43  import java.net.NetworkInterface;
44  import java.net.SocketAddress;
45  import java.net.SocketException;
46  import java.net.SocketTimeoutException;
47  import java.nio.channels.NotYetConnectedException;
48  import java.util.List;
49  import java.util.Locale;
50  
51  /**
52   * An OIO datagram {@link Channel} that sends and receives an
53   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
54   *
55   * @see AddressedEnvelope
56   * @see DatagramPacket
57   * @deprecated use NIO / EPOLL / KQUEUE transport.
58   */
59  @Deprecated
60  public class OioDatagramChannel extends AbstractOioMessageChannel
61                                  implements DatagramChannel {
62  
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
64  
65      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
66      private static final String EXPECTED_TYPES =
67              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
68              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
69              StringUtil.simpleClassName(ByteBuf.class) + ", " +
70              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
71              StringUtil.simpleClassName(ByteBuf.class) + ')';
72  
73      private final MulticastSocket socket;
74      private final OioDatagramChannelConfig config;
75      private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
76  
77      private static MulticastSocket newSocket() {
78          try {
79              return new MulticastSocket(null);
80          } catch (Exception e) {
81              throw new ChannelException("failed to create a new socket", e);
82          }
83      }
84  
85      /**
86       * Create a new instance with an new {@link MulticastSocket}.
87       */
88      public OioDatagramChannel() {
89          this(newSocket());
90      }
91  
92      /**
93       * Create a new instance from the given {@link MulticastSocket}.
94       *
95       * @param socket    the {@link MulticastSocket} which is used by this instance
96       */
97      public OioDatagramChannel(MulticastSocket socket) {
98          super(null);
99  
100         boolean success = false;
101         try {
102             socket.setSoTimeout(SO_TIMEOUT);
103             socket.setBroadcast(false);
104             success = true;
105         } catch (SocketException e) {
106             throw new ChannelException(
107                     "Failed to configure the datagram socket timeout.", e);
108         } finally {
109             if (!success) {
110                 socket.close();
111             }
112         }
113 
114         this.socket = socket;
115         config = new DefaultOioDatagramChannelConfig(this, socket);
116     }
117 
118     @Override
119     public ChannelMetadata metadata() {
120         return METADATA;
121     }
122 
123     /**
124      * {@inheritDoc}
125      *
126      * This can be safetly cast to {@link OioDatagramChannelConfig}.
127      */
128     @Override
129     // TODO: Change return type to OioDatagramChannelConfig in next major release
130     public DatagramChannelConfig config() {
131         return config;
132     }
133 
134     @Override
135     public boolean isOpen() {
136         return !socket.isClosed();
137     }
138 
139     @Override
140     @SuppressWarnings("deprecation")
141     public boolean isActive() {
142         return isOpen()
143             && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
144                  || socket.isBound());
145     }
146 
147     @Override
148     public boolean isConnected() {
149         return socket.isConnected();
150     }
151 
152     @Override
153     protected SocketAddress localAddress0() {
154         return socket.getLocalSocketAddress();
155     }
156 
157     @Override
158     protected SocketAddress remoteAddress0() {
159         return socket.getRemoteSocketAddress();
160     }
161 
162     @Override
163     protected void doBind(SocketAddress localAddress) throws Exception {
164         socket.bind(localAddress);
165     }
166 
167     @Override
168     public InetSocketAddress localAddress() {
169         return (InetSocketAddress) super.localAddress();
170     }
171 
172     @Override
173     public InetSocketAddress remoteAddress() {
174         return (InetSocketAddress) super.remoteAddress();
175     }
176 
177     @Override
178     protected void doConnect(SocketAddress remoteAddress,
179             SocketAddress localAddress) throws Exception {
180         if (localAddress != null) {
181             socket.bind(localAddress);
182         }
183 
184         boolean success = false;
185         try {
186             socket.connect(remoteAddress);
187             success = true;
188         } finally {
189             if (!success) {
190                 try {
191                     socket.close();
192                 } catch (Throwable t) {
193                     logger.warn("Failed to close a socket.", t);
194                 }
195             }
196         }
197     }
198 
199     @Override
200     protected void doDisconnect() throws Exception {
201         socket.disconnect();
202     }
203 
204     @Override
205     protected void doClose() throws Exception {
206         socket.close();
207     }
208 
209     @Override
210     protected int doReadMessages(List<Object> buf) throws Exception {
211         DatagramChannelConfig config = config();
212         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
213 
214         ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
215         boolean free = true;
216         try {
217             // Ensure we null out the address which may have been set before.
218             tmpPacket.setAddress(null);
219             tmpPacket.setData(data.array(), data.arrayOffset(), data.capacity());
220             socket.receive(tmpPacket);
221 
222             InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
223 
224             allocHandle.lastBytesRead(tmpPacket.getLength());
225             buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr));
226             free = false;
227             return 1;
228         } catch (SocketTimeoutException e) {
229             // Expected
230             return 0;
231         } catch (SocketException e) {
232             if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) {
233                 throw e;
234             }
235             return -1;
236         } catch (Throwable cause) {
237             PlatformDependent.throwException(cause);
238             return -1;
239         } finally {
240             if (free) {
241                 data.release();
242             }
243         }
244     }
245 
246     @Override
247     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
248         for (;;) {
249             final Object o = in.current();
250             if (o == null) {
251                 break;
252             }
253 
254             final ByteBuf data;
255             final SocketAddress remoteAddress;
256             if (o instanceof AddressedEnvelope) {
257                 @SuppressWarnings("unchecked")
258                 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
259                 remoteAddress = envelope.recipient();
260                 data = envelope.content();
261             } else {
262                 data = (ByteBuf) o;
263                 remoteAddress = null;
264             }
265 
266             final int length = data.readableBytes();
267             try {
268                 if (remoteAddress != null) {
269                     tmpPacket.setSocketAddress(remoteAddress);
270                 } else {
271                     if (!isConnected()) {
272                         // If not connected we should throw a NotYetConnectedException() to be consistent with
273                         // NioDatagramChannel
274                         throw new NotYetConnectedException();
275                     }
276                     // Ensure we null out the address which may have been set before.
277                     tmpPacket.setAddress(null);
278                 }
279                 if (data.hasArray()) {
280                     tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
281                 } else {
282                     tmpPacket.setData(ByteBufUtil.getBytes(data, data.readerIndex(), length));
283                 }
284                 socket.send(tmpPacket);
285                 in.remove();
286             } catch (Exception e) {
287                 // Continue on write error as a DatagramChannel can write to multiple remote peers
288                 //
289                 // See https://github.com/netty/netty/issues/2665
290                 in.remove(e);
291             }
292         }
293     }
294 
295     @Override
296     protected Object filterOutboundMessage(Object msg) {
297         if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
298             return msg;
299         }
300 
301         if (msg instanceof AddressedEnvelope) {
302             @SuppressWarnings("unchecked")
303             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
304             if (e.content() instanceof ByteBuf) {
305                 return msg;
306             }
307         }
308 
309         throw new UnsupportedOperationException(
310                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
311     }
312 
313     @Override
314     public ChannelFuture joinGroup(InetAddress multicastAddress) {
315         return joinGroup(multicastAddress, newPromise());
316     }
317 
318     @Override
319     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
320         ensureBound();
321         try {
322             socket.joinGroup(multicastAddress);
323             promise.setSuccess();
324         } catch (IOException e) {
325             promise.setFailure(e);
326         }
327         return promise;
328     }
329 
330     @Override
331     public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
332         return joinGroup(multicastAddress, networkInterface, newPromise());
333     }
334 
335     @Override
336     public ChannelFuture joinGroup(
337             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
338             ChannelPromise promise) {
339         ensureBound();
340         try {
341             socket.joinGroup(multicastAddress, networkInterface);
342             promise.setSuccess();
343         } catch (IOException e) {
344             promise.setFailure(e);
345         }
346         return promise;
347     }
348 
349     @Override
350     public ChannelFuture joinGroup(
351             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
352         return newFailedFuture(new UnsupportedOperationException());
353     }
354 
355     @Override
356     public ChannelFuture joinGroup(
357             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
358             ChannelPromise promise) {
359         promise.setFailure(new UnsupportedOperationException());
360         return promise;
361     }
362 
363     private void ensureBound() {
364         if (!isActive()) {
365             throw new IllegalStateException(
366                     DatagramChannel.class.getName() +
367                     " must be bound to join a group.");
368         }
369     }
370 
371     @Override
372     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
373         return leaveGroup(multicastAddress, newPromise());
374     }
375 
376     @Override
377     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
378         try {
379             socket.leaveGroup(multicastAddress);
380             promise.setSuccess();
381         } catch (IOException e) {
382             promise.setFailure(e);
383         }
384         return promise;
385     }
386 
387     @Override
388     public ChannelFuture leaveGroup(
389             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
390         return leaveGroup(multicastAddress, networkInterface, newPromise());
391     }
392 
393     @Override
394     public ChannelFuture leaveGroup(
395             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
396             ChannelPromise promise) {
397         try {
398             socket.leaveGroup(multicastAddress, networkInterface);
399             promise.setSuccess();
400         } catch (IOException e) {
401             promise.setFailure(e);
402         }
403         return promise;
404     }
405 
406     @Override
407     public ChannelFuture leaveGroup(
408             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
409         return newFailedFuture(new UnsupportedOperationException());
410     }
411 
412     @Override
413     public ChannelFuture leaveGroup(
414             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
415             ChannelPromise promise) {
416         promise.setFailure(new UnsupportedOperationException());
417         return promise;
418     }
419 
420     @Override
421     public ChannelFuture block(InetAddress multicastAddress,
422             NetworkInterface networkInterface, InetAddress sourceToBlock) {
423         return newFailedFuture(new UnsupportedOperationException());
424     }
425 
426     @Override
427     public ChannelFuture block(InetAddress multicastAddress,
428             NetworkInterface networkInterface, InetAddress sourceToBlock,
429             ChannelPromise promise) {
430         promise.setFailure(new UnsupportedOperationException());
431         return promise;
432     }
433 
434     @Override
435     public ChannelFuture block(InetAddress multicastAddress,
436             InetAddress sourceToBlock) {
437         return newFailedFuture(new UnsupportedOperationException());
438     }
439 
440     @Override
441     public ChannelFuture block(InetAddress multicastAddress,
442             InetAddress sourceToBlock, ChannelPromise promise) {
443         promise.setFailure(new UnsupportedOperationException());
444         return promise;
445     }
446 }